You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hama.apache.org by Roman Shapovalov <sh...@graphics.cs.msu.su> on 2013/09/30 14:54:51 UTC

readNext semantic and failure after cleanup

Hello all,

I am developing a toy master-slave application for the Python
streaming interface. There are two issues.

1. What is the semantics of the readNext command?

If I run 3 tasks -- one of them is master who does not read input, --
slaves take turn to read records, but each of them reads only each
third example, e.g. slave#1 reads records 3,6,9, while slave#2 reads
2,5,8. So 1/3 of records are skipped, as if the master task would read
them.

So, what is the exact semantics? Is there any best practice to make
each example read by some task (but not the master).


2. After the code is executed (and the output is written), the job
fails. All the task logs contain the following text:

13/09/30 16:32:09 ERROR protocol.UplinkReader: java.lang.NullPointerException
    at org.apache.hama.pipes.protocol.UplinkReader.run(UplinkReader.java:127)

The exception is raised even if I don't use pipes at all. Since it
shows up after cleanup, it is not critical for the program, but it may
indicate some misuse by me or bugs in the Hama code.

Please look at that issue. My code is attached.

Roman

Re: readNext semantic and failure after cleanup

Posted by Roman Shapovalov <sh...@graphics.cs.msu.su>.
Hi Martin,

Thank you for the support.

> Any chance to switch to Hama Pipes C++?

I don’t think so. I have a Python code to make a distribute
implementation from, and Python wrapper was the primary reason of
choosing Hama.

Roman

On Mon, Oct 7, 2013 at 2:15 PM, Martin Illecker <mi...@apache.org> wrote:
> Hi Roman,
>
> sorry for the delay.
>
> So, the text protocol does not support it, or does it lack only in the
>> Python wrapper?
>
>
> Yes only the Python wrapper does not support it.
> The Hama Pipes protocol (C++) does support custom partitioning.
>
> I could reproduce the exception, which is occurring during the protocol
> shutdown only in Streaming API.
>
> 13/09/30 16:32:09 ERROR protocol.UplinkReader:
>> java.lang.NullPointerException
>>     at org.apache.hama.pipes.protocol.UplinkReader.run(
>> UplinkReader.java:127)
>
>
> It was a problem in the Python wrapper and I fixed it [1] in my github
> repository [2].
>
> If I send any message of the length L, I receive the message with
>> additional (L-1)/2 '^@' symbols after it.
>> So, I use the following workaround. In the BSPPeer.getCurrentMessage():
>>     return line[:len(line)-len(line)//3]
>> instead of
>>     return line
>
>
> I could not locate the cause [3] for your currentMessage length problem, so
> I committed your workaround [4].
>
> Then it seems the easiest way to work it
>> around is to have the master thread resend those records to slaves...
>> if they are not very big.
>>
>
> Yes this would be an option but partitioning should be preferred.
> Any chance to switch to Hama Pipes C++?
>
> Martin
>
> [1]
> https://github.com/millecker/HamaStreaming/commit/95466287e883a892f85427303ff255bc52b00b9a
> [2] https://github.com/millecker/HamaStreaming
> [3]
> https://github.com/apache/hama/blob/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java#L94
> [4]
> https://github.com/millecker/HamaStreaming/commit/12ceb0d2db5d088004566592e42431ee40558fa7
>
>
>
> 2013/10/1 Roman Shapovalov <sh...@graphics.cs.msu.su>
>
>> It seems that the file is lost in communication. Here is a copy:
>>
>> https://dl.dropboxusercontent.com/u/42489708/MasterSlaveBSP.py
>>
>> Roman
>>
>>
>> On Tue, Oct 1, 2013 at 6:13 PM, Roman Shapovalov
>> <sh...@graphics.cs.msu.su> wrote:
>> > Hi Martin,
>> >
>> >> it seems you have forgotten the attachment.
>> >
>> > I can see one in the message I sent. Attaching again, try this.
>> >
>> >
>> >> But currently the Hama Streaming API [2] does not support partitioning.
>> >
>> > So, the text protocol does not support it, or does it lack only in the
>> > Python wrapper?
>> >
>> > So, the default partitioning is arbitrary, regardless of who is
>> > reading and who is not? Then it seems the easiest way to work it
>> > around is to have the master thread resend those records to slaves...
>> > if they are not very big.
>> >
>> > Thanks,
>> > Roman
>> >
>> > On Tue, Oct 1, 2013 at 9:30 AM, Martin Illecker <mi...@apache.org>
>> wrote:
>> >> Hi Roman,
>> >>
>> >> it seems you have forgotten the attachment. (your code)
>> >>
>> >> ad 1)
>> >> I would solve this by using a custom partitioner.
>> >> A custom partitioner defines which records are distributed to which
>> tasks.
>> >>
>> >> Here is some C++ partitioner example [1].
>> >> e.g., key 3,6,9 partitioner should return 1
>> >> and  key 2,5,8 should return 2
>> >>
>> >> But currently the Hama Streaming API [2] does not support partitioning.
>> >> Only Hama Pipes C++ supports it.
>> >>
>> >> ad 2)
>> >> Please submit your code, I will have a look at this exception.
>> >> Or please submit the tasklog.
>> >>
>> >> Martin
>> >>
>> >> [1]
>> >>
>> https://github.com/apache/hama/blob/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc#L131-138
>> >> [2]
>> >>
>> https://github.com/millecker/HamaStreaming/blob/1009bb1a6472d11f5dd3af9dc07fe64547dd0290/BinaryProtocol.py#L37-38
>> >>
>> >> 2013/9/30 Roman Shapovalov <sh...@graphics.cs.msu.su>
>> >>
>> >>> Hello all,
>> >>>
>> >>> I am developing a toy master-slave application for the Python
>> >>> streaming interface. There are two issues.
>> >>>
>> >>> 1. What is the semantics of the readNext command?
>> >>>
>> >>> If I run 3 tasks -- one of them is master who does not read input, --
>> >>> slaves take turn to read records, but each of them reads only each
>> >>> third example, e.g. slave#1 reads records 3,6,9, while slave#2 reads
>> >>> 2,5,8. So 1/3 of records are skipped, as if the master task would read
>> >>> them.
>> >>>
>> >>> So, what is the exact semantics? Is there any best practice to make
>> >>> each example read by some task (but not the master).
>> >>>
>> >>>
>> >>> 2. After the code is executed (and the output is written), the job
>> >>> fails. All the task logs contain the following text:
>> >>>
>> >>> 13/09/30 16:32:09 ERROR protocol.UplinkReader:
>> >>> java.lang.NullPointerException
>> >>>     at
>> >>> org.apache.hama.pipes.protocol.UplinkReader.run(UplinkReader.java:127)
>> >>>
>> >>> The exception is raised even if I don't use pipes at all. Since it
>> >>> shows up after cleanup, it is not critical for the program, but it may
>> >>> indicate some misuse by me or bugs in the Hama code.
>> >>>
>> >>> Please look at that issue. My code is attached.
>> >>>
>> >>> Roman
>> >>>
>>

Re: readNext semantic and failure after cleanup

Posted by Martin Illecker <mi...@apache.org>.
Hi Roman,

sorry for the delay.

So, the text protocol does not support it, or does it lack only in the
> Python wrapper?


Yes only the Python wrapper does not support it.
The Hama Pipes protocol (C++) does support custom partitioning.

I could reproduce the exception, which is occurring during the protocol
shutdown only in Streaming API.

13/09/30 16:32:09 ERROR protocol.UplinkReader:
> java.lang.NullPointerException
>     at org.apache.hama.pipes.protocol.UplinkReader.run(
> UplinkReader.java:127)


It was a problem in the Python wrapper and I fixed it [1] in my github
repository [2].

If I send any message of the length L, I receive the message with
> additional (L-1)/2 '^@' symbols after it.
> So, I use the following workaround. In the BSPPeer.getCurrentMessage():
>     return line[:len(line)-len(line)//3]
> instead of
>     return line


I could not locate the cause [3] for your currentMessage length problem, so
I committed your workaround [4].

Then it seems the easiest way to work it
> around is to have the master thread resend those records to slaves...
> if they are not very big.
>

Yes this would be an option but partitioning should be preferred.
Any chance to switch to Hama Pipes C++?

Martin

[1]
https://github.com/millecker/HamaStreaming/commit/95466287e883a892f85427303ff255bc52b00b9a
[2] https://github.com/millecker/HamaStreaming
[3]
https://github.com/apache/hama/blob/trunk/core/src/main/java/org/apache/hama/pipes/protocol/StreamingProtocol.java#L94
[4]
https://github.com/millecker/HamaStreaming/commit/12ceb0d2db5d088004566592e42431ee40558fa7



2013/10/1 Roman Shapovalov <sh...@graphics.cs.msu.su>

> It seems that the file is lost in communication. Here is a copy:
>
> https://dl.dropboxusercontent.com/u/42489708/MasterSlaveBSP.py
>
> Roman
>
>
> On Tue, Oct 1, 2013 at 6:13 PM, Roman Shapovalov
> <sh...@graphics.cs.msu.su> wrote:
> > Hi Martin,
> >
> >> it seems you have forgotten the attachment.
> >
> > I can see one in the message I sent. Attaching again, try this.
> >
> >
> >> But currently the Hama Streaming API [2] does not support partitioning.
> >
> > So, the text protocol does not support it, or does it lack only in the
> > Python wrapper?
> >
> > So, the default partitioning is arbitrary, regardless of who is
> > reading and who is not? Then it seems the easiest way to work it
> > around is to have the master thread resend those records to slaves...
> > if they are not very big.
> >
> > Thanks,
> > Roman
> >
> > On Tue, Oct 1, 2013 at 9:30 AM, Martin Illecker <mi...@apache.org>
> wrote:
> >> Hi Roman,
> >>
> >> it seems you have forgotten the attachment. (your code)
> >>
> >> ad 1)
> >> I would solve this by using a custom partitioner.
> >> A custom partitioner defines which records are distributed to which
> tasks.
> >>
> >> Here is some C++ partitioner example [1].
> >> e.g., key 3,6,9 partitioner should return 1
> >> and  key 2,5,8 should return 2
> >>
> >> But currently the Hama Streaming API [2] does not support partitioning.
> >> Only Hama Pipes C++ supports it.
> >>
> >> ad 2)
> >> Please submit your code, I will have a look at this exception.
> >> Or please submit the tasklog.
> >>
> >> Martin
> >>
> >> [1]
> >>
> https://github.com/apache/hama/blob/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc#L131-138
> >> [2]
> >>
> https://github.com/millecker/HamaStreaming/blob/1009bb1a6472d11f5dd3af9dc07fe64547dd0290/BinaryProtocol.py#L37-38
> >>
> >> 2013/9/30 Roman Shapovalov <sh...@graphics.cs.msu.su>
> >>
> >>> Hello all,
> >>>
> >>> I am developing a toy master-slave application for the Python
> >>> streaming interface. There are two issues.
> >>>
> >>> 1. What is the semantics of the readNext command?
> >>>
> >>> If I run 3 tasks -- one of them is master who does not read input, --
> >>> slaves take turn to read records, but each of them reads only each
> >>> third example, e.g. slave#1 reads records 3,6,9, while slave#2 reads
> >>> 2,5,8. So 1/3 of records are skipped, as if the master task would read
> >>> them.
> >>>
> >>> So, what is the exact semantics? Is there any best practice to make
> >>> each example read by some task (but not the master).
> >>>
> >>>
> >>> 2. After the code is executed (and the output is written), the job
> >>> fails. All the task logs contain the following text:
> >>>
> >>> 13/09/30 16:32:09 ERROR protocol.UplinkReader:
> >>> java.lang.NullPointerException
> >>>     at
> >>> org.apache.hama.pipes.protocol.UplinkReader.run(UplinkReader.java:127)
> >>>
> >>> The exception is raised even if I don't use pipes at all. Since it
> >>> shows up after cleanup, it is not critical for the program, but it may
> >>> indicate some misuse by me or bugs in the Hama code.
> >>>
> >>> Please look at that issue. My code is attached.
> >>>
> >>> Roman
> >>>
>

Re: readNext semantic and failure after cleanup

Posted by Roman Shapovalov <sh...@graphics.cs.msu.su>.
Hi again,

Another possible bug in Streaming.
If I send any message of the length L, I receive the message with
additional (L-1)/2 '^@' symbols after it.
So, I use the following workaround. In the BSPPeer.getCurrentMessage():
    return line[:len(line)-len(line)//3]
instead of
    return line

This actually works. May this be caused by a change in the protocol?

Thanks,
Roman


On Tue, Oct 1, 2013 at 1:06 PM, Roman Shapovalov
<sh...@graphics.cs.msu.su> wrote:
> It seems that the file is lost in communication. Here is a copy:
>
> https://dl.dropboxusercontent.com/u/42489708/MasterSlaveBSP.py
>
> Roman
>
>
> On Tue, Oct 1, 2013 at 6:13 PM, Roman Shapovalov
> <sh...@graphics.cs.msu.su> wrote:
>> Hi Martin,
>>
>>> it seems you have forgotten the attachment.
>>
>> I can see one in the message I sent. Attaching again, try this.
>>
>>
>>> But currently the Hama Streaming API [2] does not support partitioning.
>>
>> So, the text protocol does not support it, or does it lack only in the
>> Python wrapper?
>>
>> So, the default partitioning is arbitrary, regardless of who is
>> reading and who is not? Then it seems the easiest way to work it
>> around is to have the master thread resend those records to slaves...
>> if they are not very big.
>>
>> Thanks,
>> Roman
>>
>> On Tue, Oct 1, 2013 at 9:30 AM, Martin Illecker <mi...@apache.org> wrote:
>>> Hi Roman,
>>>
>>> it seems you have forgotten the attachment. (your code)
>>>
>>> ad 1)
>>> I would solve this by using a custom partitioner.
>>> A custom partitioner defines which records are distributed to which tasks.
>>>
>>> Here is some C++ partitioner example [1].
>>> e.g., key 3,6,9 partitioner should return 1
>>> and  key 2,5,8 should return 2
>>>
>>> But currently the Hama Streaming API [2] does not support partitioning.
>>> Only Hama Pipes C++ supports it.
>>>
>>> ad 2)
>>> Please submit your code, I will have a look at this exception.
>>> Or please submit the tasklog.
>>>
>>> Martin
>>>
>>> [1]
>>> https://github.com/apache/hama/blob/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc#L131-138
>>> [2]
>>> https://github.com/millecker/HamaStreaming/blob/1009bb1a6472d11f5dd3af9dc07fe64547dd0290/BinaryProtocol.py#L37-38
>>>
>>> 2013/9/30 Roman Shapovalov <sh...@graphics.cs.msu.su>
>>>
>>>> Hello all,
>>>>
>>>> I am developing a toy master-slave application for the Python
>>>> streaming interface. There are two issues.
>>>>
>>>> 1. What is the semantics of the readNext command?
>>>>
>>>> If I run 3 tasks -- one of them is master who does not read input, --
>>>> slaves take turn to read records, but each of them reads only each
>>>> third example, e.g. slave#1 reads records 3,6,9, while slave#2 reads
>>>> 2,5,8. So 1/3 of records are skipped, as if the master task would read
>>>> them.
>>>>
>>>> So, what is the exact semantics? Is there any best practice to make
>>>> each example read by some task (but not the master).
>>>>
>>>>
>>>> 2. After the code is executed (and the output is written), the job
>>>> fails. All the task logs contain the following text:
>>>>
>>>> 13/09/30 16:32:09 ERROR protocol.UplinkReader:
>>>> java.lang.NullPointerException
>>>>     at
>>>> org.apache.hama.pipes.protocol.UplinkReader.run(UplinkReader.java:127)
>>>>
>>>> The exception is raised even if I don't use pipes at all. Since it
>>>> shows up after cleanup, it is not critical for the program, but it may
>>>> indicate some misuse by me or bugs in the Hama code.
>>>>
>>>> Please look at that issue. My code is attached.
>>>>
>>>> Roman
>>>>

Re: readNext semantic and failure after cleanup

Posted by Roman Shapovalov <sh...@graphics.cs.msu.su>.
It seems that the file is lost in communication. Here is a copy:

https://dl.dropboxusercontent.com/u/42489708/MasterSlaveBSP.py

Roman


On Tue, Oct 1, 2013 at 6:13 PM, Roman Shapovalov
<sh...@graphics.cs.msu.su> wrote:
> Hi Martin,
>
>> it seems you have forgotten the attachment.
>
> I can see one in the message I sent. Attaching again, try this.
>
>
>> But currently the Hama Streaming API [2] does not support partitioning.
>
> So, the text protocol does not support it, or does it lack only in the
> Python wrapper?
>
> So, the default partitioning is arbitrary, regardless of who is
> reading and who is not? Then it seems the easiest way to work it
> around is to have the master thread resend those records to slaves...
> if they are not very big.
>
> Thanks,
> Roman
>
> On Tue, Oct 1, 2013 at 9:30 AM, Martin Illecker <mi...@apache.org> wrote:
>> Hi Roman,
>>
>> it seems you have forgotten the attachment. (your code)
>>
>> ad 1)
>> I would solve this by using a custom partitioner.
>> A custom partitioner defines which records are distributed to which tasks.
>>
>> Here is some C++ partitioner example [1].
>> e.g., key 3,6,9 partitioner should return 1
>> and  key 2,5,8 should return 2
>>
>> But currently the Hama Streaming API [2] does not support partitioning.
>> Only Hama Pipes C++ supports it.
>>
>> ad 2)
>> Please submit your code, I will have a look at this exception.
>> Or please submit the tasklog.
>>
>> Martin
>>
>> [1]
>> https://github.com/apache/hama/blob/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc#L131-138
>> [2]
>> https://github.com/millecker/HamaStreaming/blob/1009bb1a6472d11f5dd3af9dc07fe64547dd0290/BinaryProtocol.py#L37-38
>>
>> 2013/9/30 Roman Shapovalov <sh...@graphics.cs.msu.su>
>>
>>> Hello all,
>>>
>>> I am developing a toy master-slave application for the Python
>>> streaming interface. There are two issues.
>>>
>>> 1. What is the semantics of the readNext command?
>>>
>>> If I run 3 tasks -- one of them is master who does not read input, --
>>> slaves take turn to read records, but each of them reads only each
>>> third example, e.g. slave#1 reads records 3,6,9, while slave#2 reads
>>> 2,5,8. So 1/3 of records are skipped, as if the master task would read
>>> them.
>>>
>>> So, what is the exact semantics? Is there any best practice to make
>>> each example read by some task (but not the master).
>>>
>>>
>>> 2. After the code is executed (and the output is written), the job
>>> fails. All the task logs contain the following text:
>>>
>>> 13/09/30 16:32:09 ERROR protocol.UplinkReader:
>>> java.lang.NullPointerException
>>>     at
>>> org.apache.hama.pipes.protocol.UplinkReader.run(UplinkReader.java:127)
>>>
>>> The exception is raised even if I don't use pipes at all. Since it
>>> shows up after cleanup, it is not critical for the program, but it may
>>> indicate some misuse by me or bugs in the Hama code.
>>>
>>> Please look at that issue. My code is attached.
>>>
>>> Roman
>>>

Re: readNext semantic and failure after cleanup

Posted by Roman Shapovalov <sh...@graphics.cs.msu.su>.
Hi Martin,

> it seems you have forgotten the attachment.

I can see one in the message I sent. Attaching again, try this.


> But currently the Hama Streaming API [2] does not support partitioning.

So, the text protocol does not support it, or does it lack only in the
Python wrapper?

So, the default partitioning is arbitrary, regardless of who is
reading and who is not? Then it seems the easiest way to work it
around is to have the master thread resend those records to slaves...
if they are not very big.

Thanks,
Roman

On Tue, Oct 1, 2013 at 9:30 AM, Martin Illecker <mi...@apache.org> wrote:
> Hi Roman,
>
> it seems you have forgotten the attachment. (your code)
>
> ad 1)
> I would solve this by using a custom partitioner.
> A custom partitioner defines which records are distributed to which tasks.
>
> Here is some C++ partitioner example [1].
> e.g., key 3,6,9 partitioner should return 1
> and  key 2,5,8 should return 2
>
> But currently the Hama Streaming API [2] does not support partitioning.
> Only Hama Pipes C++ supports it.
>
> ad 2)
> Please submit your code, I will have a look at this exception.
> Or please submit the tasklog.
>
> Martin
>
> [1]
> https://github.com/apache/hama/blob/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc#L131-138
> [2]
> https://github.com/millecker/HamaStreaming/blob/1009bb1a6472d11f5dd3af9dc07fe64547dd0290/BinaryProtocol.py#L37-38
>
> 2013/9/30 Roman Shapovalov <sh...@graphics.cs.msu.su>
>
>> Hello all,
>>
>> I am developing a toy master-slave application for the Python
>> streaming interface. There are two issues.
>>
>> 1. What is the semantics of the readNext command?
>>
>> If I run 3 tasks -- one of them is master who does not read input, --
>> slaves take turn to read records, but each of them reads only each
>> third example, e.g. slave#1 reads records 3,6,9, while slave#2 reads
>> 2,5,8. So 1/3 of records are skipped, as if the master task would read
>> them.
>>
>> So, what is the exact semantics? Is there any best practice to make
>> each example read by some task (but not the master).
>>
>>
>> 2. After the code is executed (and the output is written), the job
>> fails. All the task logs contain the following text:
>>
>> 13/09/30 16:32:09 ERROR protocol.UplinkReader:
>> java.lang.NullPointerException
>>     at
>> org.apache.hama.pipes.protocol.UplinkReader.run(UplinkReader.java:127)
>>
>> The exception is raised even if I don't use pipes at all. Since it
>> shows up after cleanup, it is not critical for the program, but it may
>> indicate some misuse by me or bugs in the Hama code.
>>
>> Please look at that issue. My code is attached.
>>
>> Roman
>>

Re: readNext semantic and failure after cleanup

Posted by Martin Illecker <mi...@apache.org>.
Hi Roman,

it seems you have forgotten the attachment. (your code)

ad 1)
I would solve this by using a custom partitioner.
A custom partitioner defines which records are distributed to which tasks.

Here is some C++ partitioner example [1].
e.g., key 3,6,9 partitioner should return 1
and  key 2,5,8 should return 2

But currently the Hama Streaming API [2] does not support partitioning.
Only Hama Pipes C++ supports it.

ad 2)
Please submit your code, I will have a look at this exception.
Or please submit the tasklog.

Martin

[1]
https://github.com/apache/hama/blob/trunk/c%2B%2B/src/main/native/examples/impl/matrixmultiplication.cc#L131-138
[2]
https://github.com/millecker/HamaStreaming/blob/1009bb1a6472d11f5dd3af9dc07fe64547dd0290/BinaryProtocol.py#L37-38

2013/9/30 Roman Shapovalov <sh...@graphics.cs.msu.su>

> Hello all,
>
> I am developing a toy master-slave application for the Python
> streaming interface. There are two issues.
>
> 1. What is the semantics of the readNext command?
>
> If I run 3 tasks -- one of them is master who does not read input, --
> slaves take turn to read records, but each of them reads only each
> third example, e.g. slave#1 reads records 3,6,9, while slave#2 reads
> 2,5,8. So 1/3 of records are skipped, as if the master task would read
> them.
>
> So, what is the exact semantics? Is there any best practice to make
> each example read by some task (but not the master).
>
>
> 2. After the code is executed (and the output is written), the job
> fails. All the task logs contain the following text:
>
> 13/09/30 16:32:09 ERROR protocol.UplinkReader:
> java.lang.NullPointerException
>     at
> org.apache.hama.pipes.protocol.UplinkReader.run(UplinkReader.java:127)
>
> The exception is raised even if I don't use pipes at all. Since it
> shows up after cleanup, it is not critical for the program, but it may
> indicate some misuse by me or bugs in the Hama code.
>
> Please look at that issue. My code is attached.
>
> Roman
>