You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Piotr Nowojski <pi...@data-artisans.com> on 2018/03/01 08:58:16 UTC

Re: Slow Flink program

Hi,

First of all learn about what’s going with your job: check the status of the machines, cpu/network usage on the cluster. If CPU is not ~100%, analyse what is preventing the machines to work faster (network bottleneck, locking, blocking operations etc). If CPU is ~100%, profile the TaskManagers to see what can you speed up.

In your example couple of questions:
- you create CollectiveData instances with size 128000 by default. Doesn’t it mean that your records are gigantic? I can not tell, since you didn’t provide full code.
- you are mapping the data to new Tuple2<Integer, CollectiveData>(0, s);  and then keying by the first field, which is always 0. Probably all of the records are ending up on one single machine 

Piotrek

> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <su...@gmail.com> wrote:
> 
> Hi, 
> 
> I'm trying to run a simple benchmark on Flink streaming reduce. It seems it is very slow. Could you let me know if I'm doing something wrong.
> 
> Here is the program. I'm running this on 32 nodes with 20 tasks in each node. So the parallelism is at 640.
> 
> public class StreamingReduce {
>   int size;
>   int iterations;
>   StreamExecutionEnvironment env;
>   String outFile;
> 
>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment env, String outFile) {
>     this.size = size;
>     this.iterations = iterations;
>     this.env = env;
>     this.outFile = outFile;
>   }
> 
>   public void execute() {
>     DataStream<CollectiveData> stringStream = env.addSource(new RichParallelSourceFunction<CollectiveData>() {
>       int i = 1;
>       int count = 0;
>       int size = 0;
>       int iterations = 10000;
> 
>       @Override
>       public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>         ParameterTool p = (ParameterTool)
>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>         size = p.getInt("size", 128000);
>         iterations = p.getInt("itr", 10000);
>         System.out.println("6666 iterations: " + iterations + " size: " + size);
>       }
> 
>       @Override
>       public void run(SourceContext<CollectiveData> sourceContext) throws Exception {
>         while (count < iterations) {
>           CollectiveData i = new CollectiveData(size);
>           sourceContext.collect(i);
>           count++;
>         }
>       }
> 
>       @Override
>       public void cancel() {
>       }
>     });
> 
>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, CollectiveData>>() {
>       @Override
>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws Exception {
>         return new Tuple2<Integer, CollectiveData>(0, s);
>       }
>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
>       @Override
>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, CollectiveData> c1,
>                                                     Tuple2<Integer, CollectiveData> c2) throws Exception {
>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>       }
>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>       long start;
>       int count = 0;
>       int iterations;
> 
>       @Override
>       public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>         ParameterTool p = (ParameterTool)
>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>         iterations = p.getInt("itr", 10000);
>         System.out.println("7777 iterations: " + iterations);
>       }
> 
>       @Override
>       public void invoke(Tuple2<Integer, CollectiveData> integerStringTuple2) throws Exception {
>         if (count == 0) {
>           start = System.nanoTime();
>         }
>         count++;
>         if (count >= iterations) {
>           System.out.println("Final: " + count + " " + (System.nanoTime() - start) / 1000000 + " " + (integerStringTuple2.f1));
>         }
>       }
>     });
> 
>   }
> 
>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>     List<Integer> r= new ArrayList<>();
>     for (int k = 0; k < i.getList().size(); k++) {
>       r.add((i.getList().get(k) + j.getList().get(k)));
>     }
>     return new CollectiveData(r);
>   }
> }
> Thanks,
> Supun..
> 
> 


Re: Slow Flink program

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

If you didn't configure your program to use RocksDB then you're already not using RocksDB. I think the main issue, as others have pointed out, is that by keying on a constant key you're essentially turning your program into a parallelism-of-1 program, thereby wasting almost all cluster resources.

Best,
Aljoscha

> On 1. Mar 2018, at 09:25, Supun Kamburugamuve <su...@gmail.com> wrote:
> 
> Is there a way to not go between RocksDB? For this test application it seems not necessary as we don't expect fault tolerance and this is an streaming case.
> 
> Thanks,
> Supun..
> 
> On Thu, Mar 1, 2018 at 11:55 AM, Stephan Ewen <sewen@apache.org <ma...@apache.org>> wrote:
> Few quick checks:
> 
>   - Do you properly set the parallelism?
>   - If you start 640 tasks (parallelism), and you use the same key for everything, that behaves like parallelism 1 (Piotr mentioned this)
> 
>   - Do you use the RocksDB state backend? If yes, try the FsStateBackend. It looks like your state data type object (CollectiveData) is very expensive to serialize and for RocksDB, you get a back and forth serialization (off-heap => on-heap, compute, on-heap => off-heap)
> 
> On Thu, Mar 1, 2018 at 4:32 PM, Supun Kamburugamuve <supun06@gmail.com <ma...@gmail.com>> wrote:
> Yes, the program runs fine, I can see it on the UI. Sorry, didn't include the part where the execute is called. 
> 
> Thanks,
> Supun..
> 
> On Thu, Mar 1, 2018 at 10:27 AM, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
> Are you sure the program is doing anything at all?
> Do you call execute() on the StreamExecutionEnvironment?
> 
> 2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve <supun06@gmail.com <ma...@gmail.com>>:
> Thanks Piotrek, 
> 
> I did it this way on purpose to see how Flink performs. With 128000 messages it takes an un-reasonable amount of time for Flink to complete the operation. With another framework the same operation completes in about 70 seconds for 1000 messages of size 128000, while Flink takes hours.
> 
> Thanks,
> Supun.. 
> 
> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> First of all learn about what’s going with your job: check the status of the machines, cpu/network usage on the cluster. If CPU is not ~100%, analyse what is preventing the machines to work faster (network bottleneck, locking, blocking operations etc). If CPU is ~100%, profile the TaskManagers to see what can you speed up.
> 
> In your example couple of questions:
> - you create CollectiveData instances with size 128000 by default. Doesn’t it mean that your records are gigantic? I can not tell, since you didn’t provide full code.
> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0, s);  and then keying by the first field, which is always 0. Probably all of the records are ending up on one single machine 
> 
> Piotrek
> 
>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <supun06@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi, 
>> 
>> I'm trying to run a simple benchmark on Flink streaming reduce. It seems it is very slow. Could you let me know if I'm doing something wrong.
>> 
>> Here is the program. I'm running this on 32 nodes with 20 tasks in each node. So the parallelism is at 640.
>> 
>> public class StreamingReduce {
>>   int size;
>>   int iterations;
>>   StreamExecutionEnvironment env;
>>   String outFile;
>> 
>>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment env, String outFile) {
>>     this.size = size;
>>     this.iterations = iterations;
>>     this.env = env;
>>     this.outFile = outFile;
>>   }
>> 
>>   public void execute() {
>>     DataStream<CollectiveData> stringStream = env.addSource(new RichParallelSourceFunction<CollectiveData>() {
>>       int i = 1;
>>       int count = 0;
>>       int size = 0;
>>       int iterations = 10000;
>> 
>>       @Override
>>       public void open(Configuration parameters) throws Exception {
>>         super.open(parameters);
>>         ParameterTool p = (ParameterTool)
>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>         size = p.getInt("size", 128000);
>>         iterations = p.getInt("itr", 10000);
>>         System.out.println("6666 iterations: " + iterations + " size: " + size);
>>       }
>> 
>>       @Override
>>       public void run(SourceContext<CollectiveData> sourceContext) throws Exception {
>>         while (count < iterations) {
>>           CollectiveData i = new CollectiveData(size);
>>           sourceContext.collect(i);
>>           count++;
>>         }
>>       }
>> 
>>       @Override
>>       public void cancel() {
>>       }
>>     });
>> 
>>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, CollectiveData>>() {
>>       @Override
>>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws Exception {
>>         return new Tuple2<Integer, CollectiveData>(0, s);
>>       }
>>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
>>       @Override
>>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, CollectiveData> c1,
>>                                                     Tuple2<Integer, CollectiveData> c2) throws Exception {
>>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>>       }
>>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>>       long start;
>>       int count = 0;
>>       int iterations;
>> 
>>       @Override
>>       public void open(Configuration parameters) throws Exception {
>>         super.open(parameters);
>>         ParameterTool p = (ParameterTool)
>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>         iterations = p.getInt("itr", 10000);
>>         System.out.println("7777 iterations: " + iterations);
>>       }
>> 
>>       @Override
>>       public void invoke(Tuple2<Integer, CollectiveData> integerStringTuple2) throws Exception {
>>         if (count == 0) {
>>           start = System.nanoTime();
>>         }
>>         count++;
>>         if (count >= iterations) {
>>           System.out.println("Final: " + count + " " + (System.nanoTime() - start) / 1000000 + " " + (integerStringTuple2.f1));
>>         }
>>       }
>>     });
>> 
>>   }
>> 
>>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>>     List<Integer> r= new ArrayList<>();
>>     for (int k = 0; k < i.getList().size(); k++) {
>>       r.add((i.getList().get(k) + j.getList().get(k)));
>>     }
>>     return new CollectiveData(r);
>>   }
>> }
>> Thanks,
>> Supun..
>> 
>> 
> 
> 
> 
> 
> -- 
> Supun Kamburugamuve
> Member, Apache Software Foundation; http://www.apache.org <http://www.apache.org/>
> E-mail: supun@apache.o <ma...@gmail.com>rg;  Mobile: +1 812 219 2563 <tel:(812)%20219-2563>
> 
> 
> 
> 
> 
> 
> -- 
> Supun Kamburugamuve
> Member, Apache Software Foundation; http://www.apache.org <http://www.apache.org/>
> E-mail: supun@apache.o <ma...@gmail.com>rg;  Mobile: +1 812 219 2563 <tel:+1%20812-219-2563>
> 
> 
> 
> 
> 
> 
> -- 
> Supun Kamburugamuve
> Member, Apache Software Foundation; http://www.apache.org <http://www.apache.org/>
> E-mail: supun@apache.o <ma...@gmail.com>rg;  Mobile: +1 812 219 2563
> 
> 


Re: Slow Flink program

Posted by Supun Kamburugamuve <su...@gmail.com>.
Is there a way to not go between RocksDB? For this test application it
seems not necessary as we don't expect fault tolerance and this is an
streaming case.

Thanks,
Supun..

On Thu, Mar 1, 2018 at 11:55 AM, Stephan Ewen <se...@apache.org> wrote:

> Few quick checks:
>
>   - Do you properly set the parallelism?
>   - If you start 640 tasks (parallelism), and you use the same key for
> everything, that behaves like parallelism 1 (Piotr mentioned this)
>
>   - Do you use the RocksDB state backend? If yes, try the FsStateBackend.
> It looks like your state data type object (CollectiveData) is very
> expensive to serialize and for RocksDB, you get a back and forth
> serialization (off-heap => on-heap, compute, on-heap => off-heap)
>
> On Thu, Mar 1, 2018 at 4:32 PM, Supun Kamburugamuve <su...@gmail.com>
> wrote:
>
>> Yes, the program runs fine, I can see it on the UI. Sorry, didn't include
>> the part where the execute is called.
>>
>> Thanks,
>> Supun..
>>
>> On Thu, Mar 1, 2018 at 10:27 AM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Are you sure the program is doing anything at all?
>>> Do you call execute() on the StreamExecutionEnvironment?
>>>
>>> 2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve <su...@gmail.com>:
>>>
>>>> Thanks Piotrek,
>>>>
>>>> I did it this way on purpose to see how Flink performs. With 128000
>>>> messages it takes an un-reasonable amount of time for Flink to complete the
>>>> operation. With another framework the same operation completes in about 70
>>>> seconds for 1000 messages of size 128000, while Flink takes hours.
>>>>
>>>> Thanks,
>>>> Supun..
>>>>
>>>> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <piotr@data-artisans.com
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> First of all learn about what’s going with your job: check the status
>>>>> of the machines, cpu/network usage on the cluster. If CPU is not ~100%,
>>>>> analyse what is preventing the machines to work faster (network bottleneck,
>>>>> locking, blocking operations etc). If CPU is ~100%, profile the
>>>>> TaskManagers to see what can you speed up.
>>>>>
>>>>> In your example couple of questions:
>>>>> - you create CollectiveData instances with size 128000 by default.
>>>>> Doesn’t it mean that your records are gigantic? I can not tell, since you
>>>>> didn’t provide full code.
>>>>> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0,
>>>>> s);  and then keying by the first field, which is always 0. Probably
>>>>> all of the records are ending up on one single machine
>>>>>
>>>>> Piotrek
>>>>>
>>>>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <su...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to run a simple benchmark on Flink streaming reduce. It
>>>>> seems it is very slow. Could you let me know if I'm doing something wrong.
>>>>>
>>>>> Here is the program. I'm running this on 32 nodes with 20 tasks in
>>>>> each node. So the parallelism is at 640.
>>>>>
>>>>> public class StreamingReduce {
>>>>>   int size;
>>>>>   int iterations;
>>>>>   StreamExecutionEnvironment env;
>>>>>   String outFile;
>>>>>
>>>>>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment env, String outFile) {
>>>>>     this.size = size;
>>>>>     this.iterations = iterations;
>>>>>     this.env = env;
>>>>>     this.outFile = outFile;
>>>>>   }
>>>>>
>>>>>   public void execute() {
>>>>>     DataStream<CollectiveData> stringStream = env.addSource(new RichParallelSourceFunction<CollectiveData>() {
>>>>>       int i = 1;
>>>>>       int count = 0;
>>>>>       int size = 0;
>>>>>       int iterations = 10000;
>>>>>
>>>>>       @Override
>>>>>       public void open(Configuration parameters) throws Exception {
>>>>>         super.open(parameters);
>>>>>         ParameterTool p = (ParameterTool)
>>>>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>>>         size = p.getInt("size", 128000);
>>>>>         iterations = p.getInt("itr", 10000);
>>>>>         System.out.println("6666 iterations: " + iterations + " size: " + size);
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void run(SourceContext<CollectiveData> sourceContext) throws Exception {
>>>>>         while (count < iterations) {
>>>>>           CollectiveData i = new CollectiveData(size);
>>>>>           sourceContext.collect(i);
>>>>>           count++;
>>>>>         }
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void cancel() {
>>>>>       }
>>>>>     });
>>>>>
>>>>>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, CollectiveData>>() {
>>>>>       @Override
>>>>>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws Exception {
>>>>>         return new Tuple2<Integer, CollectiveData>(0, s);
>>>>>       }
>>>>>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
>>>>>       @Override
>>>>>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, CollectiveData> c1,
>>>>>                                                     Tuple2<Integer, CollectiveData> c2) throws Exception {
>>>>>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>>>>>       }
>>>>>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>>>>>       long start;
>>>>>       int count = 0;
>>>>>       int iterations;
>>>>>
>>>>>       @Override
>>>>>       public void open(Configuration parameters) throws Exception {
>>>>>         super.open(parameters);
>>>>>         ParameterTool p = (ParameterTool)
>>>>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>>>         iterations = p.getInt("itr", 10000);
>>>>>         System.out.println("7777 iterations: " + iterations);
>>>>>       }
>>>>>
>>>>>       @Override
>>>>>       public void invoke(Tuple2<Integer, CollectiveData> integerStringTuple2) throws Exception {
>>>>>         if (count == 0) {
>>>>>           start = System.nanoTime();
>>>>>         }
>>>>>         count++;
>>>>>         if (count >= iterations) {
>>>>>           System.out.println("Final: " + count + " " + (System.nanoTime() - start) / 1000000 + " " + (integerStringTuple2.f1));
>>>>>         }
>>>>>       }
>>>>>     });
>>>>>
>>>>>   }
>>>>>
>>>>>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>>>>>     List<Integer> r= new ArrayList<>();
>>>>>     for (int k = 0; k < i.getList().size(); k++) {
>>>>>       r.add((i.getList().get(k) + j.getList().get(k)));
>>>>>     }
>>>>>     return new CollectiveData(r);
>>>>>   }
>>>>> }
>>>>>
>>>>> Thanks,
>>>>> Supun..
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Supun Kamburugamuve
>>>> Member, Apache Software Foundation; http://www.apache.org
>>>> E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563
>>>> <(812)%20219-2563>
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Supun Kamburugamuve
>> Member, Apache Software Foundation; http://www.apache.org
>> E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563
>> <+1%20812-219-2563>
>>
>>
>>
>


-- 
Supun Kamburugamuve
Member, Apache Software Foundation; http://www.apache.org
E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563

Re: Slow Flink program

Posted by Stephan Ewen <se...@apache.org>.
Few quick checks:

  - Do you properly set the parallelism?
  - If you start 640 tasks (parallelism), and you use the same key for
everything, that behaves like parallelism 1 (Piotr mentioned this)

  - Do you use the RocksDB state backend? If yes, try the FsStateBackend.
It looks like your state data type object (CollectiveData) is very
expensive to serialize and for RocksDB, you get a back and forth
serialization (off-heap => on-heap, compute, on-heap => off-heap)

On Thu, Mar 1, 2018 at 4:32 PM, Supun Kamburugamuve <su...@gmail.com>
wrote:

> Yes, the program runs fine, I can see it on the UI. Sorry, didn't include
> the part where the execute is called.
>
> Thanks,
> Supun..
>
> On Thu, Mar 1, 2018 at 10:27 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Are you sure the program is doing anything at all?
>> Do you call execute() on the StreamExecutionEnvironment?
>>
>> 2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve <su...@gmail.com>:
>>
>>> Thanks Piotrek,
>>>
>>> I did it this way on purpose to see how Flink performs. With 128000
>>> messages it takes an un-reasonable amount of time for Flink to complete the
>>> operation. With another framework the same operation completes in about 70
>>> seconds for 1000 messages of size 128000, while Flink takes hours.
>>>
>>> Thanks,
>>> Supun..
>>>
>>> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> First of all learn about what’s going with your job: check the status
>>>> of the machines, cpu/network usage on the cluster. If CPU is not ~100%,
>>>> analyse what is preventing the machines to work faster (network bottleneck,
>>>> locking, blocking operations etc). If CPU is ~100%, profile the
>>>> TaskManagers to see what can you speed up.
>>>>
>>>> In your example couple of questions:
>>>> - you create CollectiveData instances with size 128000 by default.
>>>> Doesn’t it mean that your records are gigantic? I can not tell, since you
>>>> didn’t provide full code.
>>>> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0,
>>>> s);  and then keying by the first field, which is always 0. Probably
>>>> all of the records are ending up on one single machine
>>>>
>>>> Piotrek
>>>>
>>>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <su...@gmail.com>
>>>> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm trying to run a simple benchmark on Flink streaming reduce. It
>>>> seems it is very slow. Could you let me know if I'm doing something wrong.
>>>>
>>>> Here is the program. I'm running this on 32 nodes with 20 tasks in each
>>>> node. So the parallelism is at 640.
>>>>
>>>> public class StreamingReduce {
>>>>   int size;
>>>>   int iterations;
>>>>   StreamExecutionEnvironment env;
>>>>   String outFile;
>>>>
>>>>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment env, String outFile) {
>>>>     this.size = size;
>>>>     this.iterations = iterations;
>>>>     this.env = env;
>>>>     this.outFile = outFile;
>>>>   }
>>>>
>>>>   public void execute() {
>>>>     DataStream<CollectiveData> stringStream = env.addSource(new RichParallelSourceFunction<CollectiveData>() {
>>>>       int i = 1;
>>>>       int count = 0;
>>>>       int size = 0;
>>>>       int iterations = 10000;
>>>>
>>>>       @Override
>>>>       public void open(Configuration parameters) throws Exception {
>>>>         super.open(parameters);
>>>>         ParameterTool p = (ParameterTool)
>>>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>>         size = p.getInt("size", 128000);
>>>>         iterations = p.getInt("itr", 10000);
>>>>         System.out.println("6666 iterations: " + iterations + " size: " + size);
>>>>       }
>>>>
>>>>       @Override
>>>>       public void run(SourceContext<CollectiveData> sourceContext) throws Exception {
>>>>         while (count < iterations) {
>>>>           CollectiveData i = new CollectiveData(size);
>>>>           sourceContext.collect(i);
>>>>           count++;
>>>>         }
>>>>       }
>>>>
>>>>       @Override
>>>>       public void cancel() {
>>>>       }
>>>>     });
>>>>
>>>>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, CollectiveData>>() {
>>>>       @Override
>>>>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws Exception {
>>>>         return new Tuple2<Integer, CollectiveData>(0, s);
>>>>       }
>>>>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
>>>>       @Override
>>>>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, CollectiveData> c1,
>>>>                                                     Tuple2<Integer, CollectiveData> c2) throws Exception {
>>>>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>>>>       }
>>>>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>>>>       long start;
>>>>       int count = 0;
>>>>       int iterations;
>>>>
>>>>       @Override
>>>>       public void open(Configuration parameters) throws Exception {
>>>>         super.open(parameters);
>>>>         ParameterTool p = (ParameterTool)
>>>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>>         iterations = p.getInt("itr", 10000);
>>>>         System.out.println("7777 iterations: " + iterations);
>>>>       }
>>>>
>>>>       @Override
>>>>       public void invoke(Tuple2<Integer, CollectiveData> integerStringTuple2) throws Exception {
>>>>         if (count == 0) {
>>>>           start = System.nanoTime();
>>>>         }
>>>>         count++;
>>>>         if (count >= iterations) {
>>>>           System.out.println("Final: " + count + " " + (System.nanoTime() - start) / 1000000 + " " + (integerStringTuple2.f1));
>>>>         }
>>>>       }
>>>>     });
>>>>
>>>>   }
>>>>
>>>>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>>>>     List<Integer> r= new ArrayList<>();
>>>>     for (int k = 0; k < i.getList().size(); k++) {
>>>>       r.add((i.getList().get(k) + j.getList().get(k)));
>>>>     }
>>>>     return new CollectiveData(r);
>>>>   }
>>>> }
>>>>
>>>> Thanks,
>>>> Supun..
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Supun Kamburugamuve
>>> Member, Apache Software Foundation; http://www.apache.org
>>> E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563
>>> <(812)%20219-2563>
>>>
>>>
>>>
>>
>
>
> --
> Supun Kamburugamuve
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563
> <+1%20812-219-2563>
>
>
>

Re: Slow Flink program

Posted by Supun Kamburugamuve <su...@gmail.com>.
Yes, the program runs fine, I can see it on the UI. Sorry, didn't include
the part where the execute is called.

Thanks,
Supun..

On Thu, Mar 1, 2018 at 10:27 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Are you sure the program is doing anything at all?
> Do you call execute() on the StreamExecutionEnvironment?
>
> 2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve <su...@gmail.com>:
>
>> Thanks Piotrek,
>>
>> I did it this way on purpose to see how Flink performs. With 128000
>> messages it takes an un-reasonable amount of time for Flink to complete the
>> operation. With another framework the same operation completes in about 70
>> seconds for 1000 messages of size 128000, while Flink takes hours.
>>
>> Thanks,
>> Supun..
>>
>> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <pi...@data-artisans.com>
>> wrote:
>>
>>> Hi,
>>>
>>> First of all learn about what’s going with your job: check the status of
>>> the machines, cpu/network usage on the cluster. If CPU is not ~100%,
>>> analyse what is preventing the machines to work faster (network bottleneck,
>>> locking, blocking operations etc). If CPU is ~100%, profile the
>>> TaskManagers to see what can you speed up.
>>>
>>> In your example couple of questions:
>>> - you create CollectiveData instances with size 128000 by default.
>>> Doesn’t it mean that your records are gigantic? I can not tell, since you
>>> didn’t provide full code.
>>> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0, s);
>>> and then keying by the first field, which is always 0. Probably all of the
>>> records are ending up on one single machine
>>>
>>> Piotrek
>>>
>>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <su...@gmail.com> wrote:
>>>
>>> Hi,
>>>
>>> I'm trying to run a simple benchmark on Flink streaming reduce. It seems
>>> it is very slow. Could you let me know if I'm doing something wrong.
>>>
>>> Here is the program. I'm running this on 32 nodes with 20 tasks in each
>>> node. So the parallelism is at 640.
>>>
>>> public class StreamingReduce {
>>>   int size;
>>>   int iterations;
>>>   StreamExecutionEnvironment env;
>>>   String outFile;
>>>
>>>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment env, String outFile) {
>>>     this.size = size;
>>>     this.iterations = iterations;
>>>     this.env = env;
>>>     this.outFile = outFile;
>>>   }
>>>
>>>   public void execute() {
>>>     DataStream<CollectiveData> stringStream = env.addSource(new RichParallelSourceFunction<CollectiveData>() {
>>>       int i = 1;
>>>       int count = 0;
>>>       int size = 0;
>>>       int iterations = 10000;
>>>
>>>       @Override
>>>       public void open(Configuration parameters) throws Exception {
>>>         super.open(parameters);
>>>         ParameterTool p = (ParameterTool)
>>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>         size = p.getInt("size", 128000);
>>>         iterations = p.getInt("itr", 10000);
>>>         System.out.println("6666 iterations: " + iterations + " size: " + size);
>>>       }
>>>
>>>       @Override
>>>       public void run(SourceContext<CollectiveData> sourceContext) throws Exception {
>>>         while (count < iterations) {
>>>           CollectiveData i = new CollectiveData(size);
>>>           sourceContext.collect(i);
>>>           count++;
>>>         }
>>>       }
>>>
>>>       @Override
>>>       public void cancel() {
>>>       }
>>>     });
>>>
>>>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, CollectiveData>>() {
>>>       @Override
>>>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws Exception {
>>>         return new Tuple2<Integer, CollectiveData>(0, s);
>>>       }
>>>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
>>>       @Override
>>>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, CollectiveData> c1,
>>>                                                     Tuple2<Integer, CollectiveData> c2) throws Exception {
>>>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>>>       }
>>>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>>>       long start;
>>>       int count = 0;
>>>       int iterations;
>>>
>>>       @Override
>>>       public void open(Configuration parameters) throws Exception {
>>>         super.open(parameters);
>>>         ParameterTool p = (ParameterTool)
>>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>>         iterations = p.getInt("itr", 10000);
>>>         System.out.println("7777 iterations: " + iterations);
>>>       }
>>>
>>>       @Override
>>>       public void invoke(Tuple2<Integer, CollectiveData> integerStringTuple2) throws Exception {
>>>         if (count == 0) {
>>>           start = System.nanoTime();
>>>         }
>>>         count++;
>>>         if (count >= iterations) {
>>>           System.out.println("Final: " + count + " " + (System.nanoTime() - start) / 1000000 + " " + (integerStringTuple2.f1));
>>>         }
>>>       }
>>>     });
>>>
>>>   }
>>>
>>>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>>>     List<Integer> r= new ArrayList<>();
>>>     for (int k = 0; k < i.getList().size(); k++) {
>>>       r.add((i.getList().get(k) + j.getList().get(k)));
>>>     }
>>>     return new CollectiveData(r);
>>>   }
>>> }
>>>
>>> Thanks,
>>> Supun..
>>>
>>>
>>>
>>>
>>
>>
>> --
>> Supun Kamburugamuve
>> Member, Apache Software Foundation; http://www.apache.org
>> E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563
>> <(812)%20219-2563>
>>
>>
>>
>


-- 
Supun Kamburugamuve
Member, Apache Software Foundation; http://www.apache.org
E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563

Re: Slow Flink program

Posted by Fabian Hueske <fh...@gmail.com>.
Are you sure the program is doing anything at all?
Do you call execute() on the StreamExecutionEnvironment?

2018-03-01 15:55 GMT+01:00 Supun Kamburugamuve <su...@gmail.com>:

> Thanks Piotrek,
>
> I did it this way on purpose to see how Flink performs. With 128000
> messages it takes an un-reasonable amount of time for Flink to complete the
> operation. With another framework the same operation completes in about 70
> seconds for 1000 messages of size 128000, while Flink takes hours.
>
> Thanks,
> Supun..
>
> On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> First of all learn about what’s going with your job: check the status of
>> the machines, cpu/network usage on the cluster. If CPU is not ~100%,
>> analyse what is preventing the machines to work faster (network bottleneck,
>> locking, blocking operations etc). If CPU is ~100%, profile the
>> TaskManagers to see what can you speed up.
>>
>> In your example couple of questions:
>> - you create CollectiveData instances with size 128000 by default.
>> Doesn’t it mean that your records are gigantic? I can not tell, since you
>> didn’t provide full code.
>> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0, s);
>> and then keying by the first field, which is always 0. Probably all of the
>> records are ending up on one single machine
>>
>> Piotrek
>>
>> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <su...@gmail.com> wrote:
>>
>> Hi,
>>
>> I'm trying to run a simple benchmark on Flink streaming reduce. It seems
>> it is very slow. Could you let me know if I'm doing something wrong.
>>
>> Here is the program. I'm running this on 32 nodes with 20 tasks in each
>> node. So the parallelism is at 640.
>>
>> public class StreamingReduce {
>>   int size;
>>   int iterations;
>>   StreamExecutionEnvironment env;
>>   String outFile;
>>
>>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment env, String outFile) {
>>     this.size = size;
>>     this.iterations = iterations;
>>     this.env = env;
>>     this.outFile = outFile;
>>   }
>>
>>   public void execute() {
>>     DataStream<CollectiveData> stringStream = env.addSource(new RichParallelSourceFunction<CollectiveData>() {
>>       int i = 1;
>>       int count = 0;
>>       int size = 0;
>>       int iterations = 10000;
>>
>>       @Override
>>       public void open(Configuration parameters) throws Exception {
>>         super.open(parameters);
>>         ParameterTool p = (ParameterTool)
>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>         size = p.getInt("size", 128000);
>>         iterations = p.getInt("itr", 10000);
>>         System.out.println("6666 iterations: " + iterations + " size: " + size);
>>       }
>>
>>       @Override
>>       public void run(SourceContext<CollectiveData> sourceContext) throws Exception {
>>         while (count < iterations) {
>>           CollectiveData i = new CollectiveData(size);
>>           sourceContext.collect(i);
>>           count++;
>>         }
>>       }
>>
>>       @Override
>>       public void cancel() {
>>       }
>>     });
>>
>>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, CollectiveData>>() {
>>       @Override
>>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws Exception {
>>         return new Tuple2<Integer, CollectiveData>(0, s);
>>       }
>>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
>>       @Override
>>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, CollectiveData> c1,
>>                                                     Tuple2<Integer, CollectiveData> c2) throws Exception {
>>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>>       }
>>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>>       long start;
>>       int count = 0;
>>       int iterations;
>>
>>       @Override
>>       public void open(Configuration parameters) throws Exception {
>>         super.open(parameters);
>>         ParameterTool p = (ParameterTool)
>>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>>         iterations = p.getInt("itr", 10000);
>>         System.out.println("7777 iterations: " + iterations);
>>       }
>>
>>       @Override
>>       public void invoke(Tuple2<Integer, CollectiveData> integerStringTuple2) throws Exception {
>>         if (count == 0) {
>>           start = System.nanoTime();
>>         }
>>         count++;
>>         if (count >= iterations) {
>>           System.out.println("Final: " + count + " " + (System.nanoTime() - start) / 1000000 + " " + (integerStringTuple2.f1));
>>         }
>>       }
>>     });
>>
>>   }
>>
>>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>>     List<Integer> r= new ArrayList<>();
>>     for (int k = 0; k < i.getList().size(); k++) {
>>       r.add((i.getList().get(k) + j.getList().get(k)));
>>     }
>>     return new CollectiveData(r);
>>   }
>> }
>>
>> Thanks,
>> Supun..
>>
>>
>>
>>
>
>
> --
> Supun Kamburugamuve
> Member, Apache Software Foundation; http://www.apache.org
> E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563
> <(812)%20219-2563>
>
>
>

Re: Slow Flink program

Posted by Supun Kamburugamuve <su...@gmail.com>.
Thanks Piotrek,

I did it this way on purpose to see how Flink performs. With 128000
messages it takes an un-reasonable amount of time for Flink to complete the
operation. With another framework the same operation completes in about 70
seconds for 1000 messages of size 128000, while Flink takes hours.

Thanks,
Supun..

On Thu, Mar 1, 2018 at 3:58 AM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> First of all learn about what’s going with your job: check the status of
> the machines, cpu/network usage on the cluster. If CPU is not ~100%,
> analyse what is preventing the machines to work faster (network bottleneck,
> locking, blocking operations etc). If CPU is ~100%, profile the
> TaskManagers to see what can you speed up.
>
> In your example couple of questions:
> - you create CollectiveData instances with size 128000 by default.
> Doesn’t it mean that your records are gigantic? I can not tell, since you
> didn’t provide full code.
> - you are mapping the data to new Tuple2<Integer, CollectiveData>(0, s);
> and then keying by the first field, which is always 0. Probably all of the
> records are ending up on one single machine
>
> Piotrek
>
> On 28 Feb 2018, at 17:20, Supun Kamburugamuve <su...@gmail.com> wrote:
>
> Hi,
>
> I'm trying to run a simple benchmark on Flink streaming reduce. It seems
> it is very slow. Could you let me know if I'm doing something wrong.
>
> Here is the program. I'm running this on 32 nodes with 20 tasks in each
> node. So the parallelism is at 640.
>
> public class StreamingReduce {
>   int size;
>   int iterations;
>   StreamExecutionEnvironment env;
>   String outFile;
>
>   public StreamingReduce(int size, int iterations, StreamExecutionEnvironment env, String outFile) {
>     this.size = size;
>     this.iterations = iterations;
>     this.env = env;
>     this.outFile = outFile;
>   }
>
>   public void execute() {
>     DataStream<CollectiveData> stringStream = env.addSource(new RichParallelSourceFunction<CollectiveData>() {
>       int i = 1;
>       int count = 0;
>       int size = 0;
>       int iterations = 10000;
>
>       @Override
>       public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>         ParameterTool p = (ParameterTool)
>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>         size = p.getInt("size", 128000);
>         iterations = p.getInt("itr", 10000);
>         System.out.println("6666 iterations: " + iterations + " size: " + size);
>       }
>
>       @Override
>       public void run(SourceContext<CollectiveData> sourceContext) throws Exception {
>         while (count < iterations) {
>           CollectiveData i = new CollectiveData(size);
>           sourceContext.collect(i);
>           count++;
>         }
>       }
>
>       @Override
>       public void cancel() {
>       }
>     });
>
>     stringStream.map(new RichMapFunction<CollectiveData, Tuple2<Integer, CollectiveData>>() {
>       @Override
>       public Tuple2<Integer, CollectiveData> map(CollectiveData s) throws Exception {
>         return new Tuple2<Integer, CollectiveData>(0, s);
>       }
>     }).keyBy(0).reduce(new ReduceFunction<Tuple2<Integer, CollectiveData>>() {
>       @Override
>       public Tuple2<Integer, CollectiveData> reduce(Tuple2<Integer, CollectiveData> c1,
>                                                     Tuple2<Integer, CollectiveData> c2) throws Exception {
>         return new Tuple2<Integer, CollectiveData>(0, add(c1.f1, c2.f1));
>       }
>     }).addSink(new RichSinkFunction<Tuple2<Integer,CollectiveData>>() {
>       long start;
>       int count = 0;
>       int iterations;
>
>       @Override
>       public void open(Configuration parameters) throws Exception {
>         super.open(parameters);
>         ParameterTool p = (ParameterTool)
>             getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>         iterations = p.getInt("itr", 10000);
>         System.out.println("7777 iterations: " + iterations);
>       }
>
>       @Override
>       public void invoke(Tuple2<Integer, CollectiveData> integerStringTuple2) throws Exception {
>         if (count == 0) {
>           start = System.nanoTime();
>         }
>         count++;
>         if (count >= iterations) {
>           System.out.println("Final: " + count + " " + (System.nanoTime() - start) / 1000000 + " " + (integerStringTuple2.f1));
>         }
>       }
>     });
>
>   }
>
>   private static CollectiveData add(CollectiveData i, CollectiveData j) {
>     List<Integer> r= new ArrayList<>();
>     for (int k = 0; k < i.getList().size(); k++) {
>       r.add((i.getList().get(k) + j.getList().get(k)));
>     }
>     return new CollectiveData(r);
>   }
> }
>
> Thanks,
> Supun..
>
>
>
>


-- 
Supun Kamburugamuve
Member, Apache Software Foundation; http://www.apache.org
E-mail: supun@apache.o <su...@gmail.com>rg;  Mobile: +1 812 219 2563