You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stefano Bortoli <s....@gmail.com> on 2014/12/02 19:07:22 UTC

No Space Left on Device

Hi guys,

a quite long process failed due to this No Space Left on Device exception,
but the machine disk is not full at all.

okkam@okkam-nano-2:/opt/flink-0.8$ df
Filesystem     1K-blocks     Used Available Use% Mounted on
/dev/sdb2      223302236 22819504 189116588  11% /
none                   4        0         4   0% /sys/fs/cgroup
udev             8156864        4   8156860   1% /dev
tmpfs            1633520      524   1632996   1% /run
none                5120        0      5120   0% /run/lock
none             8167584        0   8167584   0% /run/shm
none              102400        0    102400   0% /run/user
/dev/sdb1         523248     3428    519820   1% /boot/efi
/dev/sda1      961302560  2218352 910229748   1% /media/data
cm_processes     8167584    12116   8155468   1%
/run/cloudera-scm-agent/process

Is it possible that the temporary files were deleted 'after the problem'? I
read so, but there was no confirmation. However, it is a 256SSD disk. Each
of the 6 nodes has it.

Here is the stack trace:

16:37:59,581 ERROR
org.apache.flink.runtime.operators.RegularPactTask            - Error in
task code:  CHAIN Join
(org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
-> Filter
(org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
-> Map
(org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
(4/28)
java.io.IOException: The channel is erroneous.
    at
org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
    at
org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
    at
org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
    at
org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
    at
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
    at
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
    at
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
    at org.apache.flink.types.StringValue.writeString(StringValue.java:808)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
    at
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
    at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
    at
org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
    at
org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
    at
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
    at
org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
    at
org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
    at
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
    at
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
    at
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No space left on device
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
    at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
    at sun.nio.ch.IOUtil.write(IOUtil.java:65)
    at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
    at
org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
    at
org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)

Re: No Space Left on Device

Posted by Stefano Bortoli <s....@gmail.com>.
JAXB and Serialization are necessary for my business logic. I store data as
byte[] which are plain serialization of XML String. At every read I have to
rebuild the objects using jaxb.

Kryo in Flink will allow to manage more easily user defined objects, I
guess.

saluti,
Stefano



2014-12-04 12:41 GMT+01:00 Stephan Ewen <se...@apache.org>:

> Hi Stefano!
>
> Good to hear that it is working for you!
>
> Just a heads up: Flink is not using JAXB or any other Java Serialization
> for its data exchange, only to deploy functions into the cluster (which is
> usually very fast). When we send records around, we have a special
> serialization stack that is absolutely competitive with Kryo on
> serialization speed. We are thinking of using Kryo, though, to deploy
> functions into the cluster in the future, to work around some of the
> constraints that the java serialization has.
>
> Greetings,
> Stephan
>
>
> On Thu, Dec 4, 2014 at 8:48 AM, Stefano Bortoli <s....@gmail.com>
> wrote:
>
>> The process was completed in about 6h45m, much less than the previous
>> one. The longest time is still taken by the 'blocking part'. I guess we
>> could just increase redundancy of SolrCloud indexes, and we could reach
>> amazing performances. Furthermore, we did not apply any 'key
>> transformation' (reversing or generating Long as ID), so we have further
>> margin for improvements. Furthermore, I have the feeling that relying on
>> Kryo serialization to build the POJOs rather than old-school JAXB
>> marshalling/unmarshalling would also give quite a boost as we repeat the
>> operation at least 250M times. :-)
>>
>> Thanks a lot to everyone. Flink is making possible effective
>> deduplication on a very heterogeneous dataset of about 10M entries within
>> hours in a cluster of 6 cheap hardware nodes. :-)
>>
>> saluti,
>> Stefano
>>
>> 2014-12-03 18:31 GMT+01:00 Stefano Bortoli <s....@gmail.com>:
>>
>>> Hi all,
>>>
>>> thanks for the feedback. For the moment, I hope I resolved the problem
>>> by compressing the string into a bite[] using a custom implementation of
>>> Value interface and LZ4 algorithm. I have a little overhead on the
>>> processing of some steps, but it should reduce network traffic and required
>>> temporary space on disk.
>>>
>>> I think the problem is due to the two joins moving around quite a bit of
>>> data. Essentially I join twice something like 230 million tuples with a
>>> dataset of 9.2 million entries (~80GB). Compression seems to be working
>>> fine so far, even though I did not reach the critical point yet. I'll keep
>>> you posted to let you know whether this workaround solved the problem.
>>>
>>> I applied a double join as an alternative to the repeat 230M*2 single
>>> gets on HBase. Even though this allowed to completed the process in about
>>> 11h.
>>>
>>> thanks a lot to everyone again.
>>>
>>> saluti,
>>> Stefano
>>>
>>>
>>>
>>>
>>>
>>>
>>> 2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>>
>>>> I think I can answer on behalf of Stefano that is busy right now..the
>>>> job failed because on the job manager (that is also a task manager) the
>>>> temp folder was full.
>>>> We would like to understand how big should be the temp directory..which
>>>> parameters should we consider to make that computation?
>>>>
>>>>
>>>> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>>
>>>>> The task managers log the temporary directories at start up. Can you
>>>>> have a look there and verify that you configured the temporary directories
>>>>> correctly?
>>>>>
>>>>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> That exception means that one of the directories is full. If you have
>>>>>> several temp directories on different disks, you can add them all to the
>>>>>> config and the temp files will be rotated across the disks.
>>>>>>
>>>>>> The exception may come once the first temp directory is full. For
>>>>>> example, if you have 4 temp dirs (where 1 is rather full while the others
>>>>>> have a lot of space), it may be that one temp file on the full directory
>>>>>> grows large and exceeds the space, while the other directories have plenty
>>>>>> of space.
>>>>>>
>>>>>> Greetings,
>>>>>> Stephan
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rm...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I think Flink is deleting its temporary files.
>>>>>>>
>>>>>>> Is the temp. path set to the SSD on each machine?
>>>>>>> What is the size of the two data sets your are joining? Your cluster
>>>>>>> has 6*256GB = 1.5 TB of temporary disk space.
>>>>>>> Maybe only the temp directory of one node is full?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hey Stefano,
>>>>>>>>
>>>>>>>> I would wait for Stephan's take on this, but with caught
>>>>>>>> IOExceptions the hash table should properly clean up after itself and
>>>>>>>> delete the file.
>>>>>>>>
>>>>>>>> Can you still reproduce this problem for your use case?
>>>>>>>>
>>>>>>>> – Ufuk
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <
>>>>>>>> s.bortoli@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi guys,
>>>>>>>>>
>>>>>>>>> a quite long process failed due to this No Space Left on Device
>>>>>>>>> exception, but the machine disk is not full at all.
>>>>>>>>>
>>>>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>>>>>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>>>>>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>>>>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>>>>>>> udev             8156864        4   8156860   1% /dev
>>>>>>>>> tmpfs            1633520      524   1632996   1% /run
>>>>>>>>> none                5120        0      5120   0% /run/lock
>>>>>>>>> none             8167584        0   8167584   0% /run/shm
>>>>>>>>> none              102400        0    102400   0% /run/user
>>>>>>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>>>>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>>>>>>> cm_processes     8167584    12116   8155468   1%
>>>>>>>>> /run/cloudera-scm-agent/process
>>>>>>>>>
>>>>>>>>> Is it possible that the temporary files were deleted 'after the
>>>>>>>>> problem'? I read so, but there was no confirmation. However, it is a 256SSD
>>>>>>>>> disk. Each of the 6 nodes has it.
>>>>>>>>>
>>>>>>>>> Here is the stack trace:
>>>>>>>>>
>>>>>>>>> 16:37:59,581 ERROR
>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>>>>>>>> task code:  CHAIN Join
>>>>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>>>>>>> -> Filter
>>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>>>>>>> -> Map
>>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>>>>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>>>>>>> (4/28)
>>>>>>>>> java.io.IOException: The channel is erroneous.
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>>>>     at
>>>>>>>>> sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>>>>>>     at
>>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Space Left on Device

Posted by Stephan Ewen <se...@apache.org>.
Hi Stefano!

Good to hear that it is working for you!

Just a heads up: Flink is not using JAXB or any other Java Serialization
for its data exchange, only to deploy functions into the cluster (which is
usually very fast). When we send records around, we have a special
serialization stack that is absolutely competitive with Kryo on
serialization speed. We are thinking of using Kryo, though, to deploy
functions into the cluster in the future, to work around some of the
constraints that the java serialization has.

Greetings,
Stephan


On Thu, Dec 4, 2014 at 8:48 AM, Stefano Bortoli <s....@gmail.com> wrote:

> The process was completed in about 6h45m, much less than the previous one.
> The longest time is still taken by the 'blocking part'. I guess we could
> just increase redundancy of SolrCloud indexes, and we could reach amazing
> performances. Furthermore, we did not apply any 'key transformation'
> (reversing or generating Long as ID), so we have further margin for
> improvements. Furthermore, I have the feeling that relying on Kryo
> serialization to build the POJOs rather than old-school JAXB
> marshalling/unmarshalling would also give quite a boost as we repeat the
> operation at least 250M times. :-)
>
> Thanks a lot to everyone. Flink is making possible effective deduplication
> on a very heterogeneous dataset of about 10M entries within hours in a
> cluster of 6 cheap hardware nodes. :-)
>
> saluti,
> Stefano
>
> 2014-12-03 18:31 GMT+01:00 Stefano Bortoli <s....@gmail.com>:
>
>> Hi all,
>>
>> thanks for the feedback. For the moment, I hope I resolved the problem by
>> compressing the string into a bite[] using a custom implementation of Value
>> interface and LZ4 algorithm. I have a little overhead on the processing of
>> some steps, but it should reduce network traffic and required temporary
>> space on disk.
>>
>> I think the problem is due to the two joins moving around quite a bit of
>> data. Essentially I join twice something like 230 million tuples with a
>> dataset of 9.2 million entries (~80GB). Compression seems to be working
>> fine so far, even though I did not reach the critical point yet. I'll keep
>> you posted to let you know whether this workaround solved the problem.
>>
>> I applied a double join as an alternative to the repeat 230M*2 single
>> gets on HBase. Even though this allowed to completed the process in about
>> 11h.
>>
>> thanks a lot to everyone again.
>>
>> saluti,
>> Stefano
>>
>>
>>
>>
>>
>>
>> 2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>>
>>> I think I can answer on behalf of Stefano that is busy right now..the
>>> job failed because on the job manager (that is also a task manager) the
>>> temp folder was full.
>>> We would like to understand how big should be the temp directory..which
>>> parameters should we consider to make that computation?
>>>
>>>
>>> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>
>>>> The task managers log the temporary directories at start up. Can you
>>>> have a look there and verify that you configured the temporary directories
>>>> correctly?
>>>>
>>>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <se...@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> That exception means that one of the directories is full. If you have
>>>>> several temp directories on different disks, you can add them all to the
>>>>> config and the temp files will be rotated across the disks.
>>>>>
>>>>> The exception may come once the first temp directory is full. For
>>>>> example, if you have 4 temp dirs (where 1 is rather full while the others
>>>>> have a lot of space), it may be that one temp file on the full directory
>>>>> grows large and exceeds the space, while the other directories have plenty
>>>>> of space.
>>>>>
>>>>> Greetings,
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I think Flink is deleting its temporary files.
>>>>>>
>>>>>> Is the temp. path set to the SSD on each machine?
>>>>>> What is the size of the two data sets your are joining? Your cluster
>>>>>> has 6*256GB = 1.5 TB of temporary disk space.
>>>>>> Maybe only the temp directory of one node is full?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>>>>
>>>>>>> Hey Stefano,
>>>>>>>
>>>>>>> I would wait for Stephan's take on this, but with caught
>>>>>>> IOExceptions the hash table should properly clean up after itself and
>>>>>>> delete the file.
>>>>>>>
>>>>>>> Can you still reproduce this problem for your use case?
>>>>>>>
>>>>>>> – Ufuk
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s.bortoli@gmail.com
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi guys,
>>>>>>>>
>>>>>>>> a quite long process failed due to this No Space Left on Device
>>>>>>>> exception, but the machine disk is not full at all.
>>>>>>>>
>>>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>>>>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>>>>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>>>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>>>>>> udev             8156864        4   8156860   1% /dev
>>>>>>>> tmpfs            1633520      524   1632996   1% /run
>>>>>>>> none                5120        0      5120   0% /run/lock
>>>>>>>> none             8167584        0   8167584   0% /run/shm
>>>>>>>> none              102400        0    102400   0% /run/user
>>>>>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>>>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>>>>>> cm_processes     8167584    12116   8155468   1%
>>>>>>>> /run/cloudera-scm-agent/process
>>>>>>>>
>>>>>>>> Is it possible that the temporary files were deleted 'after the
>>>>>>>> problem'? I read so, but there was no confirmation. However, it is a 256SSD
>>>>>>>> disk. Each of the 6 nodes has it.
>>>>>>>>
>>>>>>>> Here is the stack trace:
>>>>>>>>
>>>>>>>> 16:37:59,581 ERROR
>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>>>>>>> task code:  CHAIN Join
>>>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>>>>>> -> Filter
>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>>>>>> -> Map
>>>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>>>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>>>>>> (4/28)
>>>>>>>> java.io.IOException: The channel is erroneous.
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>>>>>     at
>>>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>>>>>     at
>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>>>>>     at
>>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>>>>>     at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>>>>>     at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>>>     at
>>>>>>>> sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>>>>>     at
>>>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Space Left on Device

Posted by Stefano Bortoli <s....@gmail.com>.
The process was completed in about 6h45m, much less than the previous one.
The longest time is still taken by the 'blocking part'. I guess we could
just increase redundancy of SolrCloud indexes, and we could reach amazing
performances. Furthermore, we did not apply any 'key transformation'
(reversing or generating Long as ID), so we have further margin for
improvements. Furthermore, I have the feeling that relying on Kryo
serialization to build the POJOs rather than old-school JAXB
marshalling/unmarshalling would also give quite a boost as we repeat the
operation at least 250M times. :-)

Thanks a lot to everyone. Flink is making possible effective deduplication
on a very heterogeneous dataset of about 10M entries within hours in a
cluster of 6 cheap hardware nodes. :-)

saluti,
Stefano

2014-12-03 18:31 GMT+01:00 Stefano Bortoli <s....@gmail.com>:

> Hi all,
>
> thanks for the feedback. For the moment, I hope I resolved the problem by
> compressing the string into a bite[] using a custom implementation of Value
> interface and LZ4 algorithm. I have a little overhead on the processing of
> some steps, but it should reduce network traffic and required temporary
> space on disk.
>
> I think the problem is due to the two joins moving around quite a bit of
> data. Essentially I join twice something like 230 million tuples with a
> dataset of 9.2 million entries (~80GB). Compression seems to be working
> fine so far, even though I did not reach the critical point yet. I'll keep
> you posted to let you know whether this workaround solved the problem.
>
> I applied a double join as an alternative to the repeat 230M*2 single gets
> on HBase. Even though this allowed to completed the process in about 11h.
>
> thanks a lot to everyone again.
>
> saluti,
> Stefano
>
>
>
>
>
>
> 2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:
>
>> I think I can answer on behalf of Stefano that is busy right now..the job
>> failed because on the job manager (that is also a task manager) the temp
>> folder was full.
>> We would like to understand how big should be the temp directory..which
>> parameters should we consider to make that computation?
>>
>>
>> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>>> The task managers log the temporary directories at start up. Can you
>>> have a look there and verify that you configured the temporary directories
>>> correctly?
>>>
>>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> That exception means that one of the directories is full. If you have
>>>> several temp directories on different disks, you can add them all to the
>>>> config and the temp files will be rotated across the disks.
>>>>
>>>> The exception may come once the first temp directory is full. For
>>>> example, if you have 4 temp dirs (where 1 is rather full while the others
>>>> have a lot of space), it may be that one temp file on the full directory
>>>> grows large and exceeds the space, while the other directories have plenty
>>>> of space.
>>>>
>>>> Greetings,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I think Flink is deleting its temporary files.
>>>>>
>>>>> Is the temp. path set to the SSD on each machine?
>>>>> What is the size of the two data sets your are joining? Your cluster
>>>>> has 6*256GB = 1.5 TB of temporary disk space.
>>>>> Maybe only the temp directory of one node is full?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>>>
>>>>>> Hey Stefano,
>>>>>>
>>>>>> I would wait for Stephan's take on this, but with caught IOExceptions
>>>>>> the hash table should properly clean up after itself and delete the file.
>>>>>>
>>>>>> Can you still reproduce this problem for your use case?
>>>>>>
>>>>>> – Ufuk
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi guys,
>>>>>>>
>>>>>>> a quite long process failed due to this No Space Left on Device
>>>>>>> exception, but the machine disk is not full at all.
>>>>>>>
>>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>>>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>>>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>>>>> udev             8156864        4   8156860   1% /dev
>>>>>>> tmpfs            1633520      524   1632996   1% /run
>>>>>>> none                5120        0      5120   0% /run/lock
>>>>>>> none             8167584        0   8167584   0% /run/shm
>>>>>>> none              102400        0    102400   0% /run/user
>>>>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>>>>> cm_processes     8167584    12116   8155468   1%
>>>>>>> /run/cloudera-scm-agent/process
>>>>>>>
>>>>>>> Is it possible that the temporary files were deleted 'after the
>>>>>>> problem'? I read so, but there was no confirmation. However, it is a 256SSD
>>>>>>> disk. Each of the 6 nodes has it.
>>>>>>>
>>>>>>> Here is the stack trace:
>>>>>>>
>>>>>>> 16:37:59,581 ERROR
>>>>>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>>>>>> task code:  CHAIN Join
>>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>>>>> -> Filter
>>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>>>>> -> Map
>>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>>>>> (4/28)
>>>>>>> java.io.IOException: The channel is erroneous.
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>>>>     at
>>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>>>>     at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>>>>     at
>>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>>>>     at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>>>>     at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>>     at
>>>>>>> sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>>>>     at
>>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Space Left on Device

Posted by Stefano Bortoli <s....@gmail.com>.
Hi all,

thanks for the feedback. For the moment, I hope I resolved the problem by
compressing the string into a bite[] using a custom implementation of Value
interface and LZ4 algorithm. I have a little overhead on the processing of
some steps, but it should reduce network traffic and required temporary
space on disk.

I think the problem is due to the two joins moving around quite a bit of
data. Essentially I join twice something like 230 million tuples with a
dataset of 9.2 million entries (~80GB). Compression seems to be working
fine so far, even though I did not reach the critical point yet. I'll keep
you posted to let you know whether this workaround solved the problem.

I applied a double join as an alternative to the repeat 230M*2 single gets
on HBase. Even though this allowed to completed the process in about 11h.

thanks a lot to everyone again.

saluti,
Stefano






2014-12-03 18:02 GMT+01:00 Flavio Pompermaier <po...@okkam.it>:

> I think I can answer on behalf of Stefano that is busy right now..the job
> failed because on the job manager (that is also a task manager) the temp
> folder was full.
> We would like to understand how big should be the temp directory..which
> parameters should we consider to make that computation?
>
>
> On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> The task managers log the temporary directories at start up. Can you have
>> a look there and verify that you configured the temporary directories
>> correctly?
>>
>> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> That exception means that one of the directories is full. If you have
>>> several temp directories on different disks, you can add them all to the
>>> config and the temp files will be rotated across the disks.
>>>
>>> The exception may come once the first temp directory is full. For
>>> example, if you have 4 temp dirs (where 1 is rather full while the others
>>> have a lot of space), it may be that one temp file on the full directory
>>> grows large and exceeds the space, while the other directories have plenty
>>> of space.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I think Flink is deleting its temporary files.
>>>>
>>>> Is the temp. path set to the SSD on each machine?
>>>> What is the size of the two data sets your are joining? Your cluster
>>>> has 6*256GB = 1.5 TB of temporary disk space.
>>>> Maybe only the temp directory of one node is full?
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>>
>>>>> Hey Stefano,
>>>>>
>>>>> I would wait for Stephan's take on this, but with caught IOExceptions
>>>>> the hash table should properly clean up after itself and delete the file.
>>>>>
>>>>> Can you still reproduce this problem for your use case?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>>
>>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>>
>>>>>> a quite long process failed due to this No Space Left on Device
>>>>>> exception, but the machine disk is not full at all.
>>>>>>
>>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>>>> udev             8156864        4   8156860   1% /dev
>>>>>> tmpfs            1633520      524   1632996   1% /run
>>>>>> none                5120        0      5120   0% /run/lock
>>>>>> none             8167584        0   8167584   0% /run/shm
>>>>>> none              102400        0    102400   0% /run/user
>>>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>>>> cm_processes     8167584    12116   8155468   1%
>>>>>> /run/cloudera-scm-agent/process
>>>>>>
>>>>>> Is it possible that the temporary files were deleted 'after the
>>>>>> problem'? I read so, but there was no confirmation. However, it is a 256SSD
>>>>>> disk. Each of the 6 nodes has it.
>>>>>>
>>>>>> Here is the stack trace:
>>>>>>
>>>>>> 16:37:59,581 ERROR
>>>>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>>>>> task code:  CHAIN Join
>>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>>>> -> Filter
>>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>>>> -> Map
>>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>>>> (4/28)
>>>>>> java.io.IOException: The channel is erroneous.
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>>>     at
>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>>>     at
>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>>>     at
>>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>>>     at
>>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>>>     at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>>>     at
>>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>>>     at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>>>     at
>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>>>     at
>>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>>>     at
>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.IOException: No space left on device
>>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>>>     at
>>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Space Left on Device

Posted by Flavio Pompermaier <po...@okkam.it>.
I think I can answer on behalf of Stefano that is busy right now..the job
failed because on the job manager (that is also a task manager) the temp
folder was full.
We would like to understand how big should be the temp directory..which
parameters should we consider to make that computation?

On Wed, Dec 3, 2014 at 5:22 PM, Ufuk Celebi <uc...@apache.org> wrote:

> The task managers log the temporary directories at start up. Can you have
> a look there and verify that you configured the temporary directories
> correctly?
>
> On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Hi!
>>
>> That exception means that one of the directories is full. If you have
>> several temp directories on different disks, you can add them all to the
>> config and the temp files will be rotated across the disks.
>>
>> The exception may come once the first temp directory is full. For
>> example, if you have 4 temp dirs (where 1 is rather full while the others
>> have a lot of space), it may be that one temp file on the full directory
>> grows large and exceeds the space, while the other directories have plenty
>> of space.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rm...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> I think Flink is deleting its temporary files.
>>>
>>> Is the temp. path set to the SSD on each machine?
>>> What is the size of the two data sets your are joining? Your cluster
>>> has 6*256GB = 1.5 TB of temporary disk space.
>>> Maybe only the temp directory of one node is full?
>>>
>>>
>>>
>>>
>>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>>
>>>> Hey Stefano,
>>>>
>>>> I would wait for Stephan's take on this, but with caught IOExceptions
>>>> the hash table should properly clean up after itself and delete the file.
>>>>
>>>> Can you still reproduce this problem for your use case?
>>>>
>>>> – Ufuk
>>>>
>>>>
>>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> a quite long process failed due to this No Space Left on Device
>>>>> exception, but the machine disk is not full at all.
>>>>>
>>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>>> udev             8156864        4   8156860   1% /dev
>>>>> tmpfs            1633520      524   1632996   1% /run
>>>>> none                5120        0      5120   0% /run/lock
>>>>> none             8167584        0   8167584   0% /run/shm
>>>>> none              102400        0    102400   0% /run/user
>>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>>> cm_processes     8167584    12116   8155468   1%
>>>>> /run/cloudera-scm-agent/process
>>>>>
>>>>> Is it possible that the temporary files were deleted 'after the
>>>>> problem'? I read so, but there was no confirmation. However, it is a 256SSD
>>>>> disk. Each of the 6 nodes has it.
>>>>>
>>>>> Here is the stack trace:
>>>>>
>>>>> 16:37:59,581 ERROR
>>>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>>>> task code:  CHAIN Join
>>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>>> -> Filter
>>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>>> -> Map
>>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>>> (4/28)
>>>>> java.io.IOException: The channel is erroneous.
>>>>>     at
>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>>     at
>>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>>     at
>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>>     at
>>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>>     at
>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>>     at
>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>>     at
>>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>>     at
>>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>>     at
>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>>     at
>>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>>     at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>>     at
>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>>     at
>>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>>     at
>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.io.IOException: No space left on device
>>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>>     at
>>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>>     at
>>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: No Space Left on Device

Posted by Ufuk Celebi <uc...@apache.org>.
The task managers log the temporary directories at start up. Can you have a
look there and verify that you configured the temporary directories
correctly?

On Wed, Dec 3, 2014 at 5:17 PM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> That exception means that one of the directories is full. If you have
> several temp directories on different disks, you can add them all to the
> config and the temp files will be rotated across the disks.
>
> The exception may come once the first temp directory is full. For example,
> if you have 4 temp dirs (where 1 is rather full while the others have a lot
> of space), it may be that one temp file on the full directory grows large
> and exceeds the space, while the other directories have plenty of space.
>
> Greetings,
> Stephan
>
>
> On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rm...@apache.org>
> wrote:
>
>> Hi,
>>
>> I think Flink is deleting its temporary files.
>>
>> Is the temp. path set to the SSD on each machine?
>> What is the size of the two data sets your are joining? Your cluster
>> has 6*256GB = 1.5 TB of temporary disk space.
>> Maybe only the temp directory of one node is full?
>>
>>
>>
>>
>> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>>> Hey Stefano,
>>>
>>> I would wait for Stephan's take on this, but with caught IOExceptions
>>> the hash table should properly clean up after itself and delete the file.
>>>
>>> Can you still reproduce this problem for your use case?
>>>
>>> – Ufuk
>>>
>>>
>>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s....@gmail.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> a quite long process failed due to this No Space Left on Device
>>>> exception, but the machine disk is not full at all.
>>>>
>>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>>> none                   4        0         4   0% /sys/fs/cgroup
>>>> udev             8156864        4   8156860   1% /dev
>>>> tmpfs            1633520      524   1632996   1% /run
>>>> none                5120        0      5120   0% /run/lock
>>>> none             8167584        0   8167584   0% /run/shm
>>>> none              102400        0    102400   0% /run/user
>>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>>> cm_processes     8167584    12116   8155468   1%
>>>> /run/cloudera-scm-agent/process
>>>>
>>>> Is it possible that the temporary files were deleted 'after the
>>>> problem'? I read so, but there was no confirmation. However, it is a 256SSD
>>>> disk. Each of the 6 nodes has it.
>>>>
>>>> Here is the stack trace:
>>>>
>>>> 16:37:59,581 ERROR
>>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>>> task code:  CHAIN Join
>>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>>> -> Filter
>>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>>> -> Map
>>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>>> (4/28)
>>>> java.io.IOException: The channel is erroneous.
>>>>     at
>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>>     at
>>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>>     at
>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>>     at
>>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>>     at
>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>>     at
>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>>     at
>>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>>     at
>>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>>     at
>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>>     at
>>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>>     at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>>     at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>>     at
>>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>>     at
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>>     at
>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>>     at
>>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>>     at
>>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>>     at
>>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>>     at
>>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>>     at
>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>> Caused by: java.io.IOException: No space left on device
>>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>>     at
>>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>>     at
>>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>>
>>>>
>>>
>>
>

Re: No Space Left on Device

Posted by Stephan Ewen <se...@apache.org>.
Hi!

That exception means that one of the directories is full. If you have
several temp directories on different disks, you can add them all to the
config and the temp files will be rotated across the disks.

The exception may come once the first temp directory is full. For example,
if you have 4 temp dirs (where 1 is rather full while the others have a lot
of space), it may be that one temp file on the full directory grows large
and exceeds the space, while the other directories have plenty of space.

Greetings,
Stephan


On Wed, Dec 3, 2014 at 4:40 PM, Robert Metzger <rm...@apache.org> wrote:

> Hi,
>
> I think Flink is deleting its temporary files.
>
> Is the temp. path set to the SSD on each machine?
> What is the size of the two data sets your are joining? Your cluster
> has 6*256GB = 1.5 TB of temporary disk space.
> Maybe only the temp directory of one node is full?
>
>
>
>
> On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Hey Stefano,
>>
>> I would wait for Stephan's take on this, but with caught IOExceptions the
>> hash table should properly clean up after itself and delete the file.
>>
>> Can you still reproduce this problem for your use case?
>>
>> – Ufuk
>>
>>
>> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s....@gmail.com>
>> wrote:
>>
>>> Hi guys,
>>>
>>> a quite long process failed due to this No Space Left on Device
>>> exception, but the machine disk is not full at all.
>>>
>>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>>> Filesystem     1K-blocks     Used Available Use% Mounted on
>>> /dev/sdb2      223302236 22819504 189116588  11% /
>>> none                   4        0         4   0% /sys/fs/cgroup
>>> udev             8156864        4   8156860   1% /dev
>>> tmpfs            1633520      524   1632996   1% /run
>>> none                5120        0      5120   0% /run/lock
>>> none             8167584        0   8167584   0% /run/shm
>>> none              102400        0    102400   0% /run/user
>>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>>> cm_processes     8167584    12116   8155468   1%
>>> /run/cloudera-scm-agent/process
>>>
>>> Is it possible that the temporary files were deleted 'after the
>>> problem'? I read so, but there was no confirmation. However, it is a 256SSD
>>> disk. Each of the 6 nodes has it.
>>>
>>> Here is the stack trace:
>>>
>>> 16:37:59,581 ERROR
>>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>>> task code:  CHAIN Join
>>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>>> -> Filter
>>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>>> -> Map
>>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>>> (4/28)
>>> java.io.IOException: The channel is erroneous.
>>>     at
>>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>>     at
>>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>>     at
>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>>     at
>>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>>     at
>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>>     at
>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>>     at
>>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>>     at
>>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>>     at
>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>>     at
>>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>>     at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>>     at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>>     at
>>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>>     at
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>>     at
>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>>     at
>>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>>     at
>>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>>     at
>>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>>     at
>>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>>     at
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>>     at java.lang.Thread.run(Thread.java:745)
>>> Caused by: java.io.IOException: No space left on device
>>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>>     at
>>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>>     at
>>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>>
>>>
>>
>

Re: No Space Left on Device

Posted by Robert Metzger <rm...@apache.org>.
Hi,

I think Flink is deleting its temporary files.

Is the temp. path set to the SSD on each machine?
What is the size of the two data sets your are joining? Your cluster
has 6*256GB = 1.5 TB of temporary disk space.
Maybe only the temp directory of one node is full?




On Wed, Dec 3, 2014 at 3:52 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Hey Stefano,
>
> I would wait for Stephan's take on this, but with caught IOExceptions the
> hash table should properly clean up after itself and delete the file.
>
> Can you still reproduce this problem for your use case?
>
> – Ufuk
>
>
> On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s....@gmail.com>
> wrote:
>
>> Hi guys,
>>
>> a quite long process failed due to this No Space Left on Device
>> exception, but the machine disk is not full at all.
>>
>> okkam@okkam-nano-2:/opt/flink-0.8$ df
>> Filesystem     1K-blocks     Used Available Use% Mounted on
>> /dev/sdb2      223302236 22819504 189116588  11% /
>> none                   4        0         4   0% /sys/fs/cgroup
>> udev             8156864        4   8156860   1% /dev
>> tmpfs            1633520      524   1632996   1% /run
>> none                5120        0      5120   0% /run/lock
>> none             8167584        0   8167584   0% /run/shm
>> none              102400        0    102400   0% /run/user
>> /dev/sdb1         523248     3428    519820   1% /boot/efi
>> /dev/sda1      961302560  2218352 910229748   1% /media/data
>> cm_processes     8167584    12116   8155468   1%
>> /run/cloudera-scm-agent/process
>>
>> Is it possible that the temporary files were deleted 'after the problem'?
>> I read so, but there was no confirmation. However, it is a 256SSD disk.
>> Each of the 6 nodes has it.
>>
>> Here is the stack trace:
>>
>> 16:37:59,581 ERROR
>> org.apache.flink.runtime.operators.RegularPactTask            - Error in
>> task code:  CHAIN Join
>> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
>> -> Filter
>> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
>> -> Map
>> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>> (4/28)
>> java.io.IOException: The channel is erroneous.
>>     at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>>     at
>> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>>     at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>>     at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>>     at
>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>     at
>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>>     at
>> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>>     at
>> org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>>     at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>>     at
>> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>>     at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>>     at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>>     at
>> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>>     at
>> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>>     at
>> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>>     at
>> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>>     at
>> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>>     at
>> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>>     at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>>     at java.lang.Thread.run(Thread.java:745)
>> Caused by: java.io.IOException: No space left on device
>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>>     at
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>>     at
>> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>>
>>
>

Re: No Space Left on Device

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Stefano,

I would wait for Stephan's take on this, but with caught IOExceptions the
hash table should properly clean up after itself and delete the file.

Can you still reproduce this problem for your use case?

– Ufuk


On Tue, Dec 2, 2014 at 7:07 PM, Stefano Bortoli <s....@gmail.com> wrote:

> Hi guys,
>
> a quite long process failed due to this No Space Left on Device exception,
> but the machine disk is not full at all.
>
> okkam@okkam-nano-2:/opt/flink-0.8$ df
> Filesystem     1K-blocks     Used Available Use% Mounted on
> /dev/sdb2      223302236 22819504 189116588  11% /
> none                   4        0         4   0% /sys/fs/cgroup
> udev             8156864        4   8156860   1% /dev
> tmpfs            1633520      524   1632996   1% /run
> none                5120        0      5120   0% /run/lock
> none             8167584        0   8167584   0% /run/shm
> none              102400        0    102400   0% /run/user
> /dev/sdb1         523248     3428    519820   1% /boot/efi
> /dev/sda1      961302560  2218352 910229748   1% /media/data
> cm_processes     8167584    12116   8155468   1%
> /run/cloudera-scm-agent/process
>
> Is it possible that the temporary files were deleted 'after the problem'?
> I read so, but there was no confirmation. However, it is a 256SSD disk.
> Each of the 6 nodes has it.
>
> Here is the stack trace:
>
> 16:37:59,581 ERROR
> org.apache.flink.runtime.operators.RegularPactTask            - Error in
> task code:  CHAIN Join
> (org.okkam.flink.maintenance.deduplication.consolidate.Join2ToGetCandidates)
> -> Filter
> (org.okkam.flink.maintenance.deduplication.match.SingleMatchFilterFunctionWithFlagMatch)
> -> Map
> (org.okkam.flink.maintenance.deduplication.match.MapToTuple3MapFunction) ->
> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
> (4/28)
> java.io.IOException: The channel is erroneous.
>     at
> org.apache.flink.runtime.io.disk.iomanager.ChannelAccess.checkErroneous(ChannelAccess.java:132)
>     at
> org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter.writeBlock(BlockChannelWriter.java:73)
>     at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:218)
>     at
> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:204)
>     at
> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>     at
> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
>     at
> org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
>     at org.apache.flink.types.StringValue.writeString(StringValue.java:808)
>     at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:68)
>     at
> org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
>     at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:95)
>     at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
>     at
> org.apache.flink.runtime.operators.hash.HashPartition.insertIntoProbeBuffer(HashPartition.java:269)
>     at
> org.apache.flink.runtime.operators.hash.MutableHashTable.processProbeIter(MutableHashTable.java:474)
>     at
> org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:537)
>     at
> org.apache.flink.runtime.operators.hash.BuildSecondHashMatchIterator.callWithNextKey(BuildSecondHashMatchIterator.java:106)
>     at
> org.apache.flink.runtime.operators.MatchDriver.run(MatchDriver.java:148)
>     at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:484)
>     at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:359)
>     at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:246)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: No space left on device
>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:205)
>     at
> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest.write(BlockChannelAccess.java:259)
>     at
> org.apache.flink.runtime.io.disk.iomanager.IOManager$WriterThread.run(IOManager.java:636)
>
>