You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stefano Bortoli <s....@gmail.com> on 2014/12/02 11:21:09 UTC

missing buffers, but far below computation

Hi all,

I have just hit a problem, stack trace at the bottom.

It seem that there are not enough buffers to complete the run of a process,
even thou I am working far below the limit suggested by the function
presented here:
http://flink.incubator.apache.org/docs/0.6-incubating/faq.html

I am running on 6 nodes, top 6 tasks per machine, so 4*6*6^2=864 << 2048.

The job does a flatMap (grouped and distinct), and then two chained join on
the output of the map. Then the output of the join is filtered,
consolidated and print.

I tried restarting the cluster, due to possible leak, but it did not work.
Am I falling into a corner case of the rule of thumb, or is it possible
that there is something not working properly?

Noticeably, 2 nodes run just 2 tasks... so the equation changes a bit. Is
it possible that this is causing problems? furthermore, the tasks are
running where hbase and solr are running as well. So, the number of threads
is quite relevant.

thanks a lot for the support! :-)

saluti,
Stefano

okkam-nano-2.okkam.it
Error: java.lang.Exception: Failed to deploy the task CHAIN
Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction)
->
Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
(15/28) - execution #0 to slot SubSlot 5 (cab978f80c0cb7071136cd755e971be9
(5) - ALLOCATED/ALIVE):
org.apache.flink.runtime.io.network.InsufficientResourcesException:
okkam-nano-2.okkam.it has not enough buffers to safely execute CHAIN
Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction)
->
Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
(36 buffers missing)
at
org.apache.flink.runtime.io.network.ChannelManager.ensureBufferAvailability(ChannelManager.java:262)
at
org.apache.flink.runtime.io.network.ChannelManager.register(ChannelManager.java:130)
at
org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:598)
at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)

at
org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Re: missing buffers, but far below computation

Posted by Stefano Bortoli <s....@gmail.com>.
Thanks Ufuk,

the number of slots in total is 28. as showed in the log. I have increased
the number to 4096, and the process is running fine. Here is my program:

public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment
                .getExecutionEnvironment();

        MyTableInputFormat inputFormat = new MyTableInputFormat();

        DataSet<Tuple3<String, String, Long>> dataset =
env.createInput(inputFormat).rebalance();

        DataSet<Tuple4<String, String, String, Boolean>> grouped =
dataset.flatMap(new
FindCandidateWithMatchFlagMapFunction<>()).groupBy(0,1).reduceGroup(new
RemoveDuplicateReduceGroupFunction()).distinct();

        DataSet<Tuple6<String, String, String, Boolean, String, String>>
joined = grouped.
                join(dataset).where(0).equalTo(0).with(new
Join1ToGetCandidates()).join(dataset).where(1).equalTo(0)
                .with(new Join2ToGetCandidates());

        DataSet<Tuple3<String, String, String>> duplicates
=joined.filter(new SingleMatchFilterFunctionWithFlagMatch<>()).map(new
MapToTuple3MapFunction<>());

        DataSet<Tuple2<StringValue, MyStringList>> duplicatesToprint =
duplicates.distinct().groupBy(0).reduceGroup(new
ConsolidateByTypeDuplicatesGroupReduceFunction());


duplicatesToprint.writeAsText("file:///tmp/"+EnsMaintenanceConstants.WORKING_TABLE+"/duplicates.txt",
WriteMode.OVERWRITE).setParallelism(1);

        // System.out.println(env.getExecutionPlan());
        env.execute();
    }


saluti,
Stefano



2014-12-02 12:17 GMT+01:00 Ufuk Celebi <uc...@apache.org>:

> Hey Stefano,
>
> your number of task slots per task manager is 6 right, e.g. 6 * 6 = 36
> slots in total? You can check the total number of available task slots in
> the job manager web interface.
>
> And from the log output: are you running all tasks with parallelism of 28?
>
> If you have a long pipeline, where multiple tasks run at the same time it
> is possible that you run into a corner case. Could you post your program
> without the user code, i.e. the data flow like SOURCE => FLATMAP =>
> DISTINCT etc.?
>
> In the mean time you could try to increase the number of buffers to 4096
> (taskmanager.network.numberOfBuffers), which would cost you 128 MB of main
> memory per machine (4096 * 32 KB).
>
> – Ufuk
>
>
>
> On Tue, Dec 2, 2014 at 11:21 AM, Stefano Bortoli <s....@gmail.com>
> wrote:
>
>> Hi all,
>>
>> I have just hit a problem, stack trace at the bottom.
>>
>> It seem that there are not enough buffers to complete the run of a
>> process, even thou I am working far below the limit suggested by the
>> function presented here:
>> http://flink.incubator.apache.org/docs/0.6-incubating/faq.html
>>
>> I am running on 6 nodes, top 6 tasks per machine, so 4*6*6^2=864 << 2048.
>>
>> The job does a flatMap (grouped and distinct), and then two chained join
>> on the output of the map. Then the output of the join is filtered,
>> consolidated and print.
>>
>> I tried restarting the cluster, due to possible leak, but it did not
>> work. Am I falling into a corner case of the rule of thumb, or is it
>> possible that there is something not working properly?
>>
>> Noticeably, 2 nodes run just 2 tasks... so the equation changes a bit. Is
>> it possible that this is causing problems? furthermore, the tasks are
>> running where hbase and solr are running as well. So, the number of threads
>> is quite relevant.
>>
>> thanks a lot for the support! :-)
>>
>> saluti,
>> Stefano
>>
>> okkam-nano-2.okkam.it
>> Error: java.lang.Exception: Failed to deploy the task CHAIN
>> Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction)
>> ->
>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>> (15/28) - execution #0 to slot SubSlot 5 (cab978f80c0cb7071136cd755e971be9
>> (5) - ALLOCATED/ALIVE):
>> org.apache.flink.runtime.io.network.InsufficientResourcesException:
>> okkam-nano-2.okkam.it has not enough buffers to safely execute CHAIN
>> Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction)
>> ->
>> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
>> (36 buffers missing)
>> at
>> org.apache.flink.runtime.io.network.ChannelManager.ensureBufferAvailability(ChannelManager.java:262)
>> at
>> org.apache.flink.runtime.io.network.ChannelManager.register(ChannelManager.java:130)
>> at
>> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:598)
>> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
>> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
>>
>> at
>> org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
>> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>
>

Re: missing buffers, but far below computation

Posted by Ufuk Celebi <uc...@apache.org>.
Hey Stefano,

your number of task slots per task manager is 6 right, e.g. 6 * 6 = 36
slots in total? You can check the total number of available task slots in
the job manager web interface.

And from the log output: are you running all tasks with parallelism of 28?

If you have a long pipeline, where multiple tasks run at the same time it
is possible that you run into a corner case. Could you post your program
without the user code, i.e. the data flow like SOURCE => FLATMAP =>
DISTINCT etc.?

In the mean time you could try to increase the number of buffers to 4096
(taskmanager.network.numberOfBuffers), which would cost you 128 MB of main
memory per machine (4096 * 32 KB).

– Ufuk


On Tue, Dec 2, 2014 at 11:21 AM, Stefano Bortoli <s....@gmail.com>
wrote:

> Hi all,
>
> I have just hit a problem, stack trace at the bottom.
>
> It seem that there are not enough buffers to complete the run of a
> process, even thou I am working far below the limit suggested by the
> function presented here:
> http://flink.incubator.apache.org/docs/0.6-incubating/faq.html
>
> I am running on 6 nodes, top 6 tasks per machine, so 4*6*6^2=864 << 2048.
>
> The job does a flatMap (grouped and distinct), and then two chained join
> on the output of the map. Then the output of the join is filtered,
> consolidated and print.
>
> I tried restarting the cluster, due to possible leak, but it did not work.
> Am I falling into a corner case of the rule of thumb, or is it possible
> that there is something not working properly?
>
> Noticeably, 2 nodes run just 2 tasks... so the equation changes a bit. Is
> it possible that this is causing problems? furthermore, the tasks are
> running where hbase and solr are running as well. So, the number of threads
> is quite relevant.
>
> thanks a lot for the support! :-)
>
> saluti,
> Stefano
>
> okkam-nano-2.okkam.it
> Error: java.lang.Exception: Failed to deploy the task CHAIN
> Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction)
> ->
> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
> (15/28) - execution #0 to slot SubSlot 5 (cab978f80c0cb7071136cd755e971be9
> (5) - ALLOCATED/ALIVE):
> org.apache.flink.runtime.io.network.InsufficientResourcesException:
> okkam-nano-2.okkam.it has not enough buffers to safely execute CHAIN
> Reduce(org.okkam.flink.maintenance.deduplication.blocking.RemoveDuplicateReduceGroupFunction)
> ->
> Combine(org.apache.flink.api.java.operators.DistinctOperator$DistinctFunction)
> (36 buffers missing)
> at
> org.apache.flink.runtime.io.network.ChannelManager.ensureBufferAvailability(ChannelManager.java:262)
> at
> org.apache.flink.runtime.io.network.ChannelManager.register(ChannelManager.java:130)
> at
> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:598)
> at sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:947)
>
> at
> org.apache.flink.runtime.executiongraph.Execution$2.run(Execution.java:284)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>