You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Flavio Pompermaier <po...@okkam.it> on 2016/04/07 12:37:50 UTC

Re: threads, parallelism and task managers

We've finally created a running example (For Flink 0.10.2) of our improved
JDBC imputformat that you can run from an IDE (it creates an in-memory
derby database with 1000 rows and batch of 10) at
https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
The first time you run the program you have to comment the following line:

        stmt.executeUpdate("Drop Table users ");

In your pom declare the following dependencies:

<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>

In my laptop I have 8 cores and if I put parallelism to 16 I expect to see
16 calls to the connection pool (i.e. '==================== CREATING NEW
CONNECTION!') while I see only 8 (up to my maximum number of cores).
The number of created task instead is correct (16).

I hope this could help in understanding where the problem is!

Best and thank in advance,
Flavio

On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s....@gmail.com>
wrote:

> Hi Ufuk,
>
> here is our preliminary input formar implementation:
> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>
> if you need a running project, I will have to create a test one cause I
> cannot share the current configuration.
>
> thanks a lot in advance!
>
>
>
> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>
>> Do you have the code somewhere online? Maybe someone can have a quick
>> look over it later. I'm pretty sure that is indeed a problem with the
>> custom input format.
>>
>> – Ufuk
>>
>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s....@gmail.com>
>> wrote:
>> > Perhaps there is a misunderstanding on my side over the parallelism and
>> > split management given a data source.
>> >
>> > We started from the current JDBCInputFormat to make it multi-thread.
>> Then,
>> > given a space of keys, we create the splits based on a fetchsize set as
>> a
>> > parameter. In the open, we get a connection from the pool, and execute a
>> > query using the split interval. This sets the 'resultSet', and then the
>> > DatasourceTask iterates between reachedEnd, next and close. On close,
>> the
>> > connection is returned to the pool. We set parallelism to 32, and we
>> would
>> > expect 32 connection opened but the connections opened are just 8.
>> >
>> > We tried to make an example with the textinputformat, but being a
>> > delimitedinpurformat, the open is called sequentially when statistics
>> are
>> > built, and then the processing is executed in parallel just after all
>> the
>> > open are executed. This is not feasible in our case, because there
>> would be
>> > millions of queries before the statistics are collected.
>> >
>> > Perhaps we are doing something wrong, still to figure out what. :-/
>> >
>> > thanks a lot for your help.
>> >
>> > saluti,
>> > Stefano
>> >
>> >
>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s....@gmail.com>:
>> >>
>> >> That is exactly my point. I should have 32 threads running, but I have
>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>> Flavio
>> >> and I will try to make a simple program to produce the problem. If we
>> solve
>> >> our issues on the way, we'll let you know.
>> >>
>> >> thanks a lot anyway.
>> >>
>> >> saluti,
>> >> Stefano
>> >>
>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <tr...@apache.org>:
>> >>>
>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>> >>> futures and their callbacks. But as Ufuk said, each task will spawn
>> it’s own
>> >>> thread and if you set the parallelism to 32 then you should have 32
>> threads
>> >>> running.
>> >>>
>> >>>
>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>> s.bortoli@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> In fact, I don't use it. I just had to crawl back the runtime
>> >>>> implementation to get to the point where parallelism was switching
>> from 32
>> >>>> to 8.
>> >>>>
>> >>>> saluti,
>> >>>> Stefano
>> >>>>
>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <ti...@gmail.com>:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> for what do you use the ExecutionContext? That should actually be
>> >>>>> something which you shouldn’t be concerned with since it is only
>> used
>> >>>>> internally by the runtime.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Till
>> >>>>>
>> >>>>>
>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>> s.bortoli@gmail.com>
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Well, in theory yes. Each task has a thread, but only a number is
>> run
>> >>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>> >>>>>> environment. However, whereas the parallelism parameter is set and
>> read
>> >>>>>> correctly, when it comes to actual starting of the threads, the
>> number is
>> >>>>>> fix to 8. We run a debugger to get to the point where the thread
>> was
>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the
>> parallelims set
>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the
>> creation of
>> >>>>>> just 8 connections although parallelism is much higher.
>> >>>>>>
>> >>>>>> My question is whether this is a bug (or a feature) of the
>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some
>> variable
>> >>>>>> assignment in setting up of the MiniCluster, involving parallelism
>> and
>> >>>>>> 'default values'. Default values in terms of parallelism are based
>> on the
>> >>>>>> number of cores.
>> >>>>>>
>> >>>>>> thanks a lot for the support!
>> >>>>>>
>> >>>>>> saluti,
>> >>>>>> Stefano
>> >>>>>>
>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>> >>>>>>>
>> >>>>>>> Hey Stefano,
>> >>>>>>>
>> >>>>>>> this should work by setting the parallelism on the environment,
>> e.g.
>> >>>>>>>
>> >>>>>>> env.setParallelism(32)
>> >>>>>>>
>> >>>>>>> Is this what you are doing?
>> >>>>>>>
>> >>>>>>> The task threads are not part of a pool, but each submitted task
>> >>>>>>> creates its own Thread.
>> >>>>>>>
>> >>>>>>> – Ufuk
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>> >>>>>>> <po...@okkam.it> wrote:
>> >>>>>>> > Any help here? I think that the problem is that the JobManager
>> >>>>>>> > creates the
>> >>>>>>> > executionContext of the scheduler with
>> >>>>>>> >
>> >>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>> >>>>>>> > ForkJoinPool())
>> >>>>>>> >
>> >>>>>>> > and thus the number of concurrently running threads is limited
>> to
>> >>>>>>> > the number
>> >>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>> >>>>>>> > What do you think?
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>> >>>>>>> > <s....@gmail.com>
>> >>>>>>> > wrote:
>> >>>>>>> >>
>> >>>>>>> >> Hi guys,
>> >>>>>>> >>
>> >>>>>>> >> I am trying to test a job that should run a number of tasks to
>> >>>>>>> >> read from a
>> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>> >>>>>>> >> reading run
>> >>>>>>> >> smoothly, but I cannot seem to be able to move above the limit
>> of
>> >>>>>>> >> 8
>> >>>>>>> >> concurrent threads running. 8 is of course the number of cores
>> of
>> >>>>>>> >> my
>> >>>>>>> >> machine.
>> >>>>>>> >>
>> >>>>>>> >> I have tried working around configurations and settings, but
>> the
>> >>>>>>> >> Executor
>> >>>>>>> >> within the ExecutionContext keeps on having a parallelism of 8.
>> >>>>>>> >> Although, of
>> >>>>>>> >> course, the parallelism of the execution environment is much
>> >>>>>>> >> higher (in fact
>> >>>>>>> >> I have many more tasks to be allocated).
>> >>>>>>> >>
>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster configuration
>> >>>>>>> >> that may
>> >>>>>>> >> just override/neglect my wish for higher degree of
>> parallelism. Is
>> >>>>>>> >> there a
>> >>>>>>> >> way for me to work around this issue?
>> >>>>>>> >>
>> >>>>>>> >> please let me know. Thanks a lot for you help! :-)
>> >>>>>>> >>
>> >>>>>>> >> saluti,
>> >>>>>>> >> Stefano
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>> >
>> >>>>>>
>> >>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>> >
>>
>
>

Re: threads, parallelism and task managers

Posted by Stephan Ewen <se...@apache.org>.
No problem ;-)

On Wed, Apr 13, 2016 at 11:38 AM, Stefano Bortoli <s....@gmail.com>
wrote:

> Sounds you are damn right! thanks for the insight, dumb on us for not
> checking this before.
>
> saluti,
> Stefano
>
> 2016-04-13 11:05 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> Sounds actually not like a Flink issue. I would look into the commons
>> pool docs.
>> Maybe they size their pools by default with the number of cores, so the
>> pool has only 8 threads, and other requests are queues?
>>
>> On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <
>> pompermaier@okkam.it> wrote:
>>
>>> Any feedback about our JDBC InputFormat issue..?
>>>
>>> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> We've finally created a running example (For Flink 0.10.2) of our
>>>> improved JDBC imputformat that you can run from an IDE (it creates an
>>>> in-memory derby database with 1000 rows and batch of 10) at
>>>> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
>>>> The first time you run the program you have to comment the following
>>>> line:
>>>>
>>>>         stmt.executeUpdate("Drop Table users ");
>>>>
>>>> In your pom declare the following dependencies:
>>>>
>>>> <dependency>
>>>> <groupId>org.apache.derby</groupId>
>>>> <artifactId>derby</artifactId>
>>>> <version>10.10.1.1</version>
>>>> </dependency>
>>>> <dependency>
>>>> <groupId>org.apache.commons</groupId>
>>>> <artifactId>commons-pool2</artifactId>
>>>> <version>2.4.2</version>
>>>> </dependency>
>>>>
>>>> In my laptop I have 8 cores and if I put parallelism to 16 I expect to
>>>> see 16 calls to the connection pool (i.e. '==================== CREATING
>>>> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
>>>> The number of created task instead is correct (16).
>>>>
>>>> I hope this could help in understanding where the problem is!
>>>>
>>>> Best and thank in advance,
>>>> Flavio
>>>>
>>>> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> here is our preliminary input formar implementation:
>>>>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>>>>
>>>>> if you need a running project, I will have to create a test one cause
>>>>> I cannot share the current configuration.
>>>>>
>>>>> thanks a lot in advance!
>>>>>
>>>>>
>>>>>
>>>>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>>>>
>>>>>> Do you have the code somewhere online? Maybe someone can have a quick
>>>>>> look over it later. I'm pretty sure that is indeed a problem with the
>>>>>> custom input format.
>>>>>>
>>>>>> – Ufuk
>>>>>>
>>>>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s....@gmail.com>
>>>>>> wrote:
>>>>>> > Perhaps there is a misunderstanding on my side over the parallelism
>>>>>> and
>>>>>> > split management given a data source.
>>>>>> >
>>>>>> > We started from the current JDBCInputFormat to make it
>>>>>> multi-thread. Then,
>>>>>> > given a space of keys, we create the splits based on a fetchsize
>>>>>> set as a
>>>>>> > parameter. In the open, we get a connection from the pool, and
>>>>>> execute a
>>>>>> > query using the split interval. This sets the 'resultSet', and then
>>>>>> the
>>>>>> > DatasourceTask iterates between reachedEnd, next and close. On
>>>>>> close, the
>>>>>> > connection is returned to the pool. We set parallelism to 32, and
>>>>>> we would
>>>>>> > expect 32 connection opened but the connections opened are just 8.
>>>>>> >
>>>>>> > We tried to make an example with the textinputformat, but being a
>>>>>> > delimitedinpurformat, the open is called sequentially when
>>>>>> statistics are
>>>>>> > built, and then the processing is executed in parallel just after
>>>>>> all the
>>>>>> > open are executed. This is not feasible in our case, because there
>>>>>> would be
>>>>>> > millions of queries before the statistics are collected.
>>>>>> >
>>>>>> > Perhaps we are doing something wrong, still to figure out what. :-/
>>>>>> >
>>>>>> > thanks a lot for your help.
>>>>>> >
>>>>>> > saluti,
>>>>>> > Stefano
>>>>>> >
>>>>>> >
>>>>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s....@gmail.com>:
>>>>>> >>
>>>>>> >> That is exactly my point. I should have 32 threads running, but I
>>>>>> have
>>>>>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>>>>>> Flavio
>>>>>> >> and I will try to make a simple program to produce the problem. If
>>>>>> we solve
>>>>>> >> our issues on the way, we'll let you know.
>>>>>> >>
>>>>>> >> thanks a lot anyway.
>>>>>> >>
>>>>>> >> saluti,
>>>>>> >> Stefano
>>>>>> >>
>>>>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <tr...@apache.org>:
>>>>>> >>>
>>>>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to
>>>>>> run
>>>>>> >>> futures and their callbacks. But as Ufuk said, each task will
>>>>>> spawn it’s own
>>>>>> >>> thread and if you set the parallelism to 32 then you should have
>>>>>> 32 threads
>>>>>> >>> running.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>>>>>> s.bortoli@gmail.com>
>>>>>> >>> wrote:
>>>>>> >>>>
>>>>>> >>>> In fact, I don't use it. I just had to crawl back the runtime
>>>>>> >>>> implementation to get to the point where parallelism was
>>>>>> switching from 32
>>>>>> >>>> to 8.
>>>>>> >>>>
>>>>>> >>>> saluti,
>>>>>> >>>> Stefano
>>>>>> >>>>
>>>>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <
>>>>>> till.rohrmann@gmail.com>:
>>>>>> >>>>>
>>>>>> >>>>> Hi,
>>>>>> >>>>>
>>>>>> >>>>> for what do you use the ExecutionContext? That should actually
>>>>>> be
>>>>>> >>>>> something which you shouldn’t be concerned with since it is
>>>>>> only used
>>>>>> >>>>> internally by the runtime.
>>>>>> >>>>>
>>>>>> >>>>> Cheers,
>>>>>> >>>>> Till
>>>>>> >>>>>
>>>>>> >>>>>
>>>>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>>>>>> s.bortoli@gmail.com>
>>>>>> >>>>> wrote:
>>>>>> >>>>>>
>>>>>> >>>>>> Well, in theory yes. Each task has a thread, but only a number
>>>>>> is run
>>>>>> >>>>>> in parallel (the job of the scheduler).  Parallelism is set in
>>>>>> the
>>>>>> >>>>>> environment. However, whereas the parallelism parameter is set
>>>>>> and read
>>>>>> >>>>>> correctly, when it comes to actual starting of the threads,
>>>>>> the number is
>>>>>> >>>>>> fix to 8. We run a debugger to get to the point where the
>>>>>> thread was
>>>>>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the
>>>>>> parallelims set
>>>>>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the
>>>>>> creation of
>>>>>> >>>>>> just 8 connections although parallelism is much higher.
>>>>>> >>>>>>
>>>>>> >>>>>> My question is whether this is a bug (or a feature) of the
>>>>>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some
>>>>>> variable
>>>>>> >>>>>> assignment in setting up of the MiniCluster, involving
>>>>>> parallelism and
>>>>>> >>>>>> 'default values'. Default values in terms of parallelism are
>>>>>> based on the
>>>>>> >>>>>> number of cores.
>>>>>> >>>>>>
>>>>>> >>>>>> thanks a lot for the support!
>>>>>> >>>>>>
>>>>>> >>>>>> saluti,
>>>>>> >>>>>> Stefano
>>>>>> >>>>>>
>>>>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>>>>> >>>>>>>
>>>>>> >>>>>>> Hey Stefano,
>>>>>> >>>>>>>
>>>>>> >>>>>>> this should work by setting the parallelism on the
>>>>>> environment, e.g.
>>>>>> >>>>>>>
>>>>>> >>>>>>> env.setParallelism(32)
>>>>>> >>>>>>>
>>>>>> >>>>>>> Is this what you are doing?
>>>>>> >>>>>>>
>>>>>> >>>>>>> The task threads are not part of a pool, but each submitted
>>>>>> task
>>>>>> >>>>>>> creates its own Thread.
>>>>>> >>>>>>>
>>>>>> >>>>>>> – Ufuk
>>>>>> >>>>>>>
>>>>>> >>>>>>>
>>>>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>>> >>>>>>> <po...@okkam.it> wrote:
>>>>>> >>>>>>> > Any help here? I think that the problem is that the
>>>>>> JobManager
>>>>>> >>>>>>> > creates the
>>>>>> >>>>>>> > executionContext of the scheduler with
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >        val executionContext =
>>>>>> ExecutionContext.fromExecutor(new
>>>>>> >>>>>>> > ForkJoinPool())
>>>>>> >>>>>>> >
>>>>>> >>>>>>> > and thus the number of concurrently running threads is
>>>>>> limited to
>>>>>> >>>>>>> > the number
>>>>>> >>>>>>> > of cores (using the default constructor of the
>>>>>> ForkJoinPool).
>>>>>> >>>>>>> > What do you think?
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >
>>>>>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>>> >>>>>>> > <s....@gmail.com>
>>>>>> >>>>>>> > wrote:
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> Hi guys,
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> I am trying to test a job that should run a number of
>>>>>> tasks to
>>>>>> >>>>>>> >> read from a
>>>>>> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and
>>>>>> the
>>>>>> >>>>>>> >> reading run
>>>>>> >>>>>>> >> smoothly, but I cannot seem to be able to move above the
>>>>>> limit of
>>>>>> >>>>>>> >> 8
>>>>>> >>>>>>> >> concurrent threads running. 8 is of course the number of
>>>>>> cores of
>>>>>> >>>>>>> >> my
>>>>>> >>>>>>> >> machine.
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> I have tried working around configurations and settings,
>>>>>> but the
>>>>>> >>>>>>> >> Executor
>>>>>> >>>>>>> >> within the ExecutionContext keeps on having a parallelism
>>>>>> of 8.
>>>>>> >>>>>>> >> Although, of
>>>>>> >>>>>>> >> course, the parallelism of the execution environment is
>>>>>> much
>>>>>> >>>>>>> >> higher (in fact
>>>>>> >>>>>>> >> I have many more tasks to be allocated).
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster
>>>>>> configuration
>>>>>> >>>>>>> >> that may
>>>>>> >>>>>>> >> just override/neglect my wish for higher degree of
>>>>>> parallelism. Is
>>>>>> >>>>>>> >> there a
>>>>>> >>>>>>> >> way for me to work around this issue?
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>>> >>>>>>> >>
>>>>>> >>>>>>> >> saluti,
>>>>>> >>>>>>> >> Stefano
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >
>>>>>> >>>>>>> >
>>>>>> >>>>>>
>>>>>> >>>>>>
>>>>>> >>>>>
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: threads, parallelism and task managers

Posted by Stefano Bortoli <s....@gmail.com>.
Sounds you are damn right! thanks for the insight, dumb on us for not
checking this before.

saluti,
Stefano

2016-04-13 11:05 GMT+02:00 Stephan Ewen <se...@apache.org>:

> Sounds actually not like a Flink issue. I would look into the commons pool
> docs.
> Maybe they size their pools by default with the number of cores, so the
> pool has only 8 threads, and other requests are queues?
>
> On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <pompermaier@okkam.it
> > wrote:
>
>> Any feedback about our JDBC InputFormat issue..?
>>
>> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> We've finally created a running example (For Flink 0.10.2) of our
>>> improved JDBC imputformat that you can run from an IDE (it creates an
>>> in-memory derby database with 1000 rows and batch of 10) at
>>> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
>>> The first time you run the program you have to comment the following
>>> line:
>>>
>>>         stmt.executeUpdate("Drop Table users ");
>>>
>>> In your pom declare the following dependencies:
>>>
>>> <dependency>
>>> <groupId>org.apache.derby</groupId>
>>> <artifactId>derby</artifactId>
>>> <version>10.10.1.1</version>
>>> </dependency>
>>> <dependency>
>>> <groupId>org.apache.commons</groupId>
>>> <artifactId>commons-pool2</artifactId>
>>> <version>2.4.2</version>
>>> </dependency>
>>>
>>> In my laptop I have 8 cores and if I put parallelism to 16 I expect to
>>> see 16 calls to the connection pool (i.e. '==================== CREATING
>>> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
>>> The number of created task instead is correct (16).
>>>
>>> I hope this could help in understanding where the problem is!
>>>
>>> Best and thank in advance,
>>> Flavio
>>>
>>> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s....@gmail.com>
>>> wrote:
>>>
>>>> Hi Ufuk,
>>>>
>>>> here is our preliminary input formar implementation:
>>>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>>>
>>>> if you need a running project, I will have to create a test one cause I
>>>> cannot share the current configuration.
>>>>
>>>> thanks a lot in advance!
>>>>
>>>>
>>>>
>>>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>>>
>>>>> Do you have the code somewhere online? Maybe someone can have a quick
>>>>> look over it later. I'm pretty sure that is indeed a problem with the
>>>>> custom input format.
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s....@gmail.com>
>>>>> wrote:
>>>>> > Perhaps there is a misunderstanding on my side over the parallelism
>>>>> and
>>>>> > split management given a data source.
>>>>> >
>>>>> > We started from the current JDBCInputFormat to make it multi-thread.
>>>>> Then,
>>>>> > given a space of keys, we create the splits based on a fetchsize set
>>>>> as a
>>>>> > parameter. In the open, we get a connection from the pool, and
>>>>> execute a
>>>>> > query using the split interval. This sets the 'resultSet', and then
>>>>> the
>>>>> > DatasourceTask iterates between reachedEnd, next and close. On
>>>>> close, the
>>>>> > connection is returned to the pool. We set parallelism to 32, and we
>>>>> would
>>>>> > expect 32 connection opened but the connections opened are just 8.
>>>>> >
>>>>> > We tried to make an example with the textinputformat, but being a
>>>>> > delimitedinpurformat, the open is called sequentially when
>>>>> statistics are
>>>>> > built, and then the processing is executed in parallel just after
>>>>> all the
>>>>> > open are executed. This is not feasible in our case, because there
>>>>> would be
>>>>> > millions of queries before the statistics are collected.
>>>>> >
>>>>> > Perhaps we are doing something wrong, still to figure out what. :-/
>>>>> >
>>>>> > thanks a lot for your help.
>>>>> >
>>>>> > saluti,
>>>>> > Stefano
>>>>> >
>>>>> >
>>>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s....@gmail.com>:
>>>>> >>
>>>>> >> That is exactly my point. I should have 32 threads running, but I
>>>>> have
>>>>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>>>>> Flavio
>>>>> >> and I will try to make a simple program to produce the problem. If
>>>>> we solve
>>>>> >> our issues on the way, we'll let you know.
>>>>> >>
>>>>> >> thanks a lot anyway.
>>>>> >>
>>>>> >> saluti,
>>>>> >> Stefano
>>>>> >>
>>>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <tr...@apache.org>:
>>>>> >>>
>>>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to
>>>>> run
>>>>> >>> futures and their callbacks. But as Ufuk said, each task will
>>>>> spawn it’s own
>>>>> >>> thread and if you set the parallelism to 32 then you should have
>>>>> 32 threads
>>>>> >>> running.
>>>>> >>>
>>>>> >>>
>>>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>>>>> s.bortoli@gmail.com>
>>>>> >>> wrote:
>>>>> >>>>
>>>>> >>>> In fact, I don't use it. I just had to crawl back the runtime
>>>>> >>>> implementation to get to the point where parallelism was
>>>>> switching from 32
>>>>> >>>> to 8.
>>>>> >>>>
>>>>> >>>> saluti,
>>>>> >>>> Stefano
>>>>> >>>>
>>>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com
>>>>> >:
>>>>> >>>>>
>>>>> >>>>> Hi,
>>>>> >>>>>
>>>>> >>>>> for what do you use the ExecutionContext? That should actually be
>>>>> >>>>> something which you shouldn’t be concerned with since it is only
>>>>> used
>>>>> >>>>> internally by the runtime.
>>>>> >>>>>
>>>>> >>>>> Cheers,
>>>>> >>>>> Till
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>>>>> s.bortoli@gmail.com>
>>>>> >>>>> wrote:
>>>>> >>>>>>
>>>>> >>>>>> Well, in theory yes. Each task has a thread, but only a number
>>>>> is run
>>>>> >>>>>> in parallel (the job of the scheduler).  Parallelism is set in
>>>>> the
>>>>> >>>>>> environment. However, whereas the parallelism parameter is set
>>>>> and read
>>>>> >>>>>> correctly, when it comes to actual starting of the threads, the
>>>>> number is
>>>>> >>>>>> fix to 8. We run a debugger to get to the point where the
>>>>> thread was
>>>>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the
>>>>> parallelims set
>>>>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the
>>>>> creation of
>>>>> >>>>>> just 8 connections although parallelism is much higher.
>>>>> >>>>>>
>>>>> >>>>>> My question is whether this is a bug (or a feature) of the
>>>>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some
>>>>> variable
>>>>> >>>>>> assignment in setting up of the MiniCluster, involving
>>>>> parallelism and
>>>>> >>>>>> 'default values'. Default values in terms of parallelism are
>>>>> based on the
>>>>> >>>>>> number of cores.
>>>>> >>>>>>
>>>>> >>>>>> thanks a lot for the support!
>>>>> >>>>>>
>>>>> >>>>>> saluti,
>>>>> >>>>>> Stefano
>>>>> >>>>>>
>>>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>>>> >>>>>>>
>>>>> >>>>>>> Hey Stefano,
>>>>> >>>>>>>
>>>>> >>>>>>> this should work by setting the parallelism on the
>>>>> environment, e.g.
>>>>> >>>>>>>
>>>>> >>>>>>> env.setParallelism(32)
>>>>> >>>>>>>
>>>>> >>>>>>> Is this what you are doing?
>>>>> >>>>>>>
>>>>> >>>>>>> The task threads are not part of a pool, but each submitted
>>>>> task
>>>>> >>>>>>> creates its own Thread.
>>>>> >>>>>>>
>>>>> >>>>>>> – Ufuk
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>>> >>>>>>> <po...@okkam.it> wrote:
>>>>> >>>>>>> > Any help here? I think that the problem is that the
>>>>> JobManager
>>>>> >>>>>>> > creates the
>>>>> >>>>>>> > executionContext of the scheduler with
>>>>> >>>>>>> >
>>>>> >>>>>>> >        val executionContext =
>>>>> ExecutionContext.fromExecutor(new
>>>>> >>>>>>> > ForkJoinPool())
>>>>> >>>>>>> >
>>>>> >>>>>>> > and thus the number of concurrently running threads is
>>>>> limited to
>>>>> >>>>>>> > the number
>>>>> >>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>>> >>>>>>> > What do you think?
>>>>> >>>>>>> >
>>>>> >>>>>>> >
>>>>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>>> >>>>>>> > <s....@gmail.com>
>>>>> >>>>>>> > wrote:
>>>>> >>>>>>> >>
>>>>> >>>>>>> >> Hi guys,
>>>>> >>>>>>> >>
>>>>> >>>>>>> >> I am trying to test a job that should run a number of tasks
>>>>> to
>>>>> >>>>>>> >> read from a
>>>>> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and
>>>>> the
>>>>> >>>>>>> >> reading run
>>>>> >>>>>>> >> smoothly, but I cannot seem to be able to move above the
>>>>> limit of
>>>>> >>>>>>> >> 8
>>>>> >>>>>>> >> concurrent threads running. 8 is of course the number of
>>>>> cores of
>>>>> >>>>>>> >> my
>>>>> >>>>>>> >> machine.
>>>>> >>>>>>> >>
>>>>> >>>>>>> >> I have tried working around configurations and settings,
>>>>> but the
>>>>> >>>>>>> >> Executor
>>>>> >>>>>>> >> within the ExecutionContext keeps on having a parallelism
>>>>> of 8.
>>>>> >>>>>>> >> Although, of
>>>>> >>>>>>> >> course, the parallelism of the execution environment is much
>>>>> >>>>>>> >> higher (in fact
>>>>> >>>>>>> >> I have many more tasks to be allocated).
>>>>> >>>>>>> >>
>>>>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster
>>>>> configuration
>>>>> >>>>>>> >> that may
>>>>> >>>>>>> >> just override/neglect my wish for higher degree of
>>>>> parallelism. Is
>>>>> >>>>>>> >> there a
>>>>> >>>>>>> >> way for me to work around this issue?
>>>>> >>>>>>> >>
>>>>> >>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>>> >>>>>>> >>
>>>>> >>>>>>> >> saluti,
>>>>> >>>>>>> >> Stefano
>>>>> >>>>>>> >
>>>>> >>>>>>> >
>>>>> >>>>>>> >
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>
>>>>> >>>>
>>>>> >>>
>>>>> >>
>>>>> >
>>>>>
>>>>
>>>>
>>>
>>
>

Re: threads, parallelism and task managers

Posted by Stephan Ewen <se...@apache.org>.
Sounds actually not like a Flink issue. I would look into the commons pool
docs.
Maybe they size their pools by default with the number of cores, so the
pool has only 8 threads, and other requests are queues?

On Wed, Apr 13, 2016 at 10:29 AM, Flavio Pompermaier <po...@okkam.it>
wrote:

> Any feedback about our JDBC InputFormat issue..?
>
> On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> We've finally created a running example (For Flink 0.10.2) of our
>> improved JDBC imputformat that you can run from an IDE (it creates an
>> in-memory derby database with 1000 rows and batch of 10) at
>> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
>> The first time you run the program you have to comment the following line:
>>
>>         stmt.executeUpdate("Drop Table users ");
>>
>> In your pom declare the following dependencies:
>>
>> <dependency>
>> <groupId>org.apache.derby</groupId>
>> <artifactId>derby</artifactId>
>> <version>10.10.1.1</version>
>> </dependency>
>> <dependency>
>> <groupId>org.apache.commons</groupId>
>> <artifactId>commons-pool2</artifactId>
>> <version>2.4.2</version>
>> </dependency>
>>
>> In my laptop I have 8 cores and if I put parallelism to 16 I expect to
>> see 16 calls to the connection pool (i.e. '==================== CREATING
>> NEW CONNECTION!') while I see only 8 (up to my maximum number of cores).
>> The number of created task instead is correct (16).
>>
>> I hope this could help in understanding where the problem is!
>>
>> Best and thank in advance,
>> Flavio
>>
>> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s....@gmail.com>
>> wrote:
>>
>>> Hi Ufuk,
>>>
>>> here is our preliminary input formar implementation:
>>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>>
>>> if you need a running project, I will have to create a test one cause I
>>> cannot share the current configuration.
>>>
>>> thanks a lot in advance!
>>>
>>>
>>>
>>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>>
>>>> Do you have the code somewhere online? Maybe someone can have a quick
>>>> look over it later. I'm pretty sure that is indeed a problem with the
>>>> custom input format.
>>>>
>>>> – Ufuk
>>>>
>>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s....@gmail.com>
>>>> wrote:
>>>> > Perhaps there is a misunderstanding on my side over the parallelism
>>>> and
>>>> > split management given a data source.
>>>> >
>>>> > We started from the current JDBCInputFormat to make it multi-thread.
>>>> Then,
>>>> > given a space of keys, we create the splits based on a fetchsize set
>>>> as a
>>>> > parameter. In the open, we get a connection from the pool, and
>>>> execute a
>>>> > query using the split interval. This sets the 'resultSet', and then
>>>> the
>>>> > DatasourceTask iterates between reachedEnd, next and close. On close,
>>>> the
>>>> > connection is returned to the pool. We set parallelism to 32, and we
>>>> would
>>>> > expect 32 connection opened but the connections opened are just 8.
>>>> >
>>>> > We tried to make an example with the textinputformat, but being a
>>>> > delimitedinpurformat, the open is called sequentially when statistics
>>>> are
>>>> > built, and then the processing is executed in parallel just after all
>>>> the
>>>> > open are executed. This is not feasible in our case, because there
>>>> would be
>>>> > millions of queries before the statistics are collected.
>>>> >
>>>> > Perhaps we are doing something wrong, still to figure out what. :-/
>>>> >
>>>> > thanks a lot for your help.
>>>> >
>>>> > saluti,
>>>> > Stefano
>>>> >
>>>> >
>>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s....@gmail.com>:
>>>> >>
>>>> >> That is exactly my point. I should have 32 threads running, but I
>>>> have
>>>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>>>> Flavio
>>>> >> and I will try to make a simple program to produce the problem. If
>>>> we solve
>>>> >> our issues on the way, we'll let you know.
>>>> >>
>>>> >> thanks a lot anyway.
>>>> >>
>>>> >> saluti,
>>>> >> Stefano
>>>> >>
>>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <tr...@apache.org>:
>>>> >>>
>>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>>> >>> futures and their callbacks. But as Ufuk said, each task will spawn
>>>> it’s own
>>>> >>> thread and if you set the parallelism to 32 then you should have 32
>>>> threads
>>>> >>> running.
>>>> >>>
>>>> >>>
>>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>>>> s.bortoli@gmail.com>
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> In fact, I don't use it. I just had to crawl back the runtime
>>>> >>>> implementation to get to the point where parallelism was switching
>>>> from 32
>>>> >>>> to 8.
>>>> >>>>
>>>> >>>> saluti,
>>>> >>>> Stefano
>>>> >>>>
>>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <till.rohrmann@gmail.com
>>>> >:
>>>> >>>>>
>>>> >>>>> Hi,
>>>> >>>>>
>>>> >>>>> for what do you use the ExecutionContext? That should actually be
>>>> >>>>> something which you shouldn’t be concerned with since it is only
>>>> used
>>>> >>>>> internally by the runtime.
>>>> >>>>>
>>>> >>>>> Cheers,
>>>> >>>>> Till
>>>> >>>>>
>>>> >>>>>
>>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>>>> s.bortoli@gmail.com>
>>>> >>>>> wrote:
>>>> >>>>>>
>>>> >>>>>> Well, in theory yes. Each task has a thread, but only a number
>>>> is run
>>>> >>>>>> in parallel (the job of the scheduler).  Parallelism is set in
>>>> the
>>>> >>>>>> environment. However, whereas the parallelism parameter is set
>>>> and read
>>>> >>>>>> correctly, when it comes to actual starting of the threads, the
>>>> number is
>>>> >>>>>> fix to 8. We run a debugger to get to the point where the thread
>>>> was
>>>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the
>>>> parallelims set
>>>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the
>>>> creation of
>>>> >>>>>> just 8 connections although parallelism is much higher.
>>>> >>>>>>
>>>> >>>>>> My question is whether this is a bug (or a feature) of the
>>>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some
>>>> variable
>>>> >>>>>> assignment in setting up of the MiniCluster, involving
>>>> parallelism and
>>>> >>>>>> 'default values'. Default values in terms of parallelism are
>>>> based on the
>>>> >>>>>> number of cores.
>>>> >>>>>>
>>>> >>>>>> thanks a lot for the support!
>>>> >>>>>>
>>>> >>>>>> saluti,
>>>> >>>>>> Stefano
>>>> >>>>>>
>>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>>> >>>>>>>
>>>> >>>>>>> Hey Stefano,
>>>> >>>>>>>
>>>> >>>>>>> this should work by setting the parallelism on the environment,
>>>> e.g.
>>>> >>>>>>>
>>>> >>>>>>> env.setParallelism(32)
>>>> >>>>>>>
>>>> >>>>>>> Is this what you are doing?
>>>> >>>>>>>
>>>> >>>>>>> The task threads are not part of a pool, but each submitted task
>>>> >>>>>>> creates its own Thread.
>>>> >>>>>>>
>>>> >>>>>>> – Ufuk
>>>> >>>>>>>
>>>> >>>>>>>
>>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>>> >>>>>>> <po...@okkam.it> wrote:
>>>> >>>>>>> > Any help here? I think that the problem is that the JobManager
>>>> >>>>>>> > creates the
>>>> >>>>>>> > executionContext of the scheduler with
>>>> >>>>>>> >
>>>> >>>>>>> >        val executionContext =
>>>> ExecutionContext.fromExecutor(new
>>>> >>>>>>> > ForkJoinPool())
>>>> >>>>>>> >
>>>> >>>>>>> > and thus the number of concurrently running threads is
>>>> limited to
>>>> >>>>>>> > the number
>>>> >>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>>> >>>>>>> > What do you think?
>>>> >>>>>>> >
>>>> >>>>>>> >
>>>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>>> >>>>>>> > <s....@gmail.com>
>>>> >>>>>>> > wrote:
>>>> >>>>>>> >>
>>>> >>>>>>> >> Hi guys,
>>>> >>>>>>> >>
>>>> >>>>>>> >> I am trying to test a job that should run a number of tasks
>>>> to
>>>> >>>>>>> >> read from a
>>>> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and
>>>> the
>>>> >>>>>>> >> reading run
>>>> >>>>>>> >> smoothly, but I cannot seem to be able to move above the
>>>> limit of
>>>> >>>>>>> >> 8
>>>> >>>>>>> >> concurrent threads running. 8 is of course the number of
>>>> cores of
>>>> >>>>>>> >> my
>>>> >>>>>>> >> machine.
>>>> >>>>>>> >>
>>>> >>>>>>> >> I have tried working around configurations and settings, but
>>>> the
>>>> >>>>>>> >> Executor
>>>> >>>>>>> >> within the ExecutionContext keeps on having a parallelism of
>>>> 8.
>>>> >>>>>>> >> Although, of
>>>> >>>>>>> >> course, the parallelism of the execution environment is much
>>>> >>>>>>> >> higher (in fact
>>>> >>>>>>> >> I have many more tasks to be allocated).
>>>> >>>>>>> >>
>>>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster
>>>> configuration
>>>> >>>>>>> >> that may
>>>> >>>>>>> >> just override/neglect my wish for higher degree of
>>>> parallelism. Is
>>>> >>>>>>> >> there a
>>>> >>>>>>> >> way for me to work around this issue?
>>>> >>>>>>> >>
>>>> >>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>>> >>>>>>> >>
>>>> >>>>>>> >> saluti,
>>>> >>>>>>> >> Stefano
>>>> >>>>>>> >
>>>> >>>>>>> >
>>>> >>>>>>> >
>>>> >>>>>>
>>>> >>>>>>
>>>> >>>>>
>>>> >>>>
>>>> >>>
>>>> >>
>>>> >
>>>>
>>>
>>>
>>
>

Re: threads, parallelism and task managers

Posted by Flavio Pompermaier <po...@okkam.it>.
Any feedback about our JDBC InputFormat issue..?

On Thu, Apr 7, 2016 at 12:37 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> We've finally created a running example (For Flink 0.10.2) of our improved
> JDBC imputformat that you can run from an IDE (it creates an in-memory
> derby database with 1000 rows and batch of 10) at
> https://gist.github.com/fpompermaier/bcd704abc93b25b6744ac76ac17ed351.
> The first time you run the program you have to comment the following line:
>
>         stmt.executeUpdate("Drop Table users ");
>
> In your pom declare the following dependencies:
>
> <dependency>
> <groupId>org.apache.derby</groupId>
> <artifactId>derby</artifactId>
> <version>10.10.1.1</version>
> </dependency>
> <dependency>
> <groupId>org.apache.commons</groupId>
> <artifactId>commons-pool2</artifactId>
> <version>2.4.2</version>
> </dependency>
>
> In my laptop I have 8 cores and if I put parallelism to 16 I expect to see
> 16 calls to the connection pool (i.e. '==================== CREATING NEW
> CONNECTION!') while I see only 8 (up to my maximum number of cores).
> The number of created task instead is correct (16).
>
> I hope this could help in understanding where the problem is!
>
> Best and thank in advance,
> Flavio
>
> On Wed, Mar 30, 2016 at 11:01 AM, Stefano Bortoli <s....@gmail.com>
> wrote:
>
>> Hi Ufuk,
>>
>> here is our preliminary input formar implementation:
>> https://gist.github.com/anonymous/dbf05cad2a6cc07b8aa88e74a2c23119
>>
>> if you need a running project, I will have to create a test one cause I
>> cannot share the current configuration.
>>
>> thanks a lot in advance!
>>
>>
>>
>> 2016-03-30 10:13 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>
>>> Do you have the code somewhere online? Maybe someone can have a quick
>>> look over it later. I'm pretty sure that is indeed a problem with the
>>> custom input format.
>>>
>>> – Ufuk
>>>
>>> On Tue, Mar 29, 2016 at 3:50 PM, Stefano Bortoli <s....@gmail.com>
>>> wrote:
>>> > Perhaps there is a misunderstanding on my side over the parallelism and
>>> > split management given a data source.
>>> >
>>> > We started from the current JDBCInputFormat to make it multi-thread.
>>> Then,
>>> > given a space of keys, we create the splits based on a fetchsize set
>>> as a
>>> > parameter. In the open, we get a connection from the pool, and execute
>>> a
>>> > query using the split interval. This sets the 'resultSet', and then the
>>> > DatasourceTask iterates between reachedEnd, next and close. On close,
>>> the
>>> > connection is returned to the pool. We set parallelism to 32, and we
>>> would
>>> > expect 32 connection opened but the connections opened are just 8.
>>> >
>>> > We tried to make an example with the textinputformat, but being a
>>> > delimitedinpurformat, the open is called sequentially when statistics
>>> are
>>> > built, and then the processing is executed in parallel just after all
>>> the
>>> > open are executed. This is not feasible in our case, because there
>>> would be
>>> > millions of queries before the statistics are collected.
>>> >
>>> > Perhaps we are doing something wrong, still to figure out what. :-/
>>> >
>>> > thanks a lot for your help.
>>> >
>>> > saluti,
>>> > Stefano
>>> >
>>> >
>>> > 2016-03-29 13:30 GMT+02:00 Stefano Bortoli <s....@gmail.com>:
>>> >>
>>> >> That is exactly my point. I should have 32 threads running, but I have
>>> >> only 8. 32 Task are created, but only only 8 are run concurrently.
>>> Flavio
>>> >> and I will try to make a simple program to produce the problem. If we
>>> solve
>>> >> our issues on the way, we'll let you know.
>>> >>
>>> >> thanks a lot anyway.
>>> >>
>>> >> saluti,
>>> >> Stefano
>>> >>
>>> >> 2016-03-29 12:44 GMT+02:00 Till Rohrmann <tr...@apache.org>:
>>> >>>
>>> >>> Then it shouldn’t be a problem. The ExeuctionContetxt is used to run
>>> >>> futures and their callbacks. But as Ufuk said, each task will spawn
>>> it’s own
>>> >>> thread and if you set the parallelism to 32 then you should have 32
>>> threads
>>> >>> running.
>>> >>>
>>> >>>
>>> >>> On Tue, Mar 29, 2016 at 12:29 PM, Stefano Bortoli <
>>> s.bortoli@gmail.com>
>>> >>> wrote:
>>> >>>>
>>> >>>> In fact, I don't use it. I just had to crawl back the runtime
>>> >>>> implementation to get to the point where parallelism was switching
>>> from 32
>>> >>>> to 8.
>>> >>>>
>>> >>>> saluti,
>>> >>>> Stefano
>>> >>>>
>>> >>>> 2016-03-29 12:24 GMT+02:00 Till Rohrmann <ti...@gmail.com>:
>>> >>>>>
>>> >>>>> Hi,
>>> >>>>>
>>> >>>>> for what do you use the ExecutionContext? That should actually be
>>> >>>>> something which you shouldn’t be concerned with since it is only
>>> used
>>> >>>>> internally by the runtime.
>>> >>>>>
>>> >>>>> Cheers,
>>> >>>>> Till
>>> >>>>>
>>> >>>>>
>>> >>>>> On Tue, Mar 29, 2016 at 12:09 PM, Stefano Bortoli <
>>> s.bortoli@gmail.com>
>>> >>>>> wrote:
>>> >>>>>>
>>> >>>>>> Well, in theory yes. Each task has a thread, but only a number is
>>> run
>>> >>>>>> in parallel (the job of the scheduler).  Parallelism is set in the
>>> >>>>>> environment. However, whereas the parallelism parameter is set
>>> and read
>>> >>>>>> correctly, when it comes to actual starting of the threads, the
>>> number is
>>> >>>>>> fix to 8. We run a debugger to get to the point where the thread
>>> was
>>> >>>>>> started. As Flavio mentioned, the ExecutionContext has the
>>> parallelims set
>>> >>>>>> to 8. We have a pool of connections to a RDBS and il logs the
>>> creation of
>>> >>>>>> just 8 connections although parallelism is much higher.
>>> >>>>>>
>>> >>>>>> My question is whether this is a bug (or a feature) of the
>>> >>>>>> LocalMiniCluster. :-) I am not scala expert, but I see some
>>> variable
>>> >>>>>> assignment in setting up of the MiniCluster, involving
>>> parallelism and
>>> >>>>>> 'default values'. Default values in terms of parallelism are
>>> based on the
>>> >>>>>> number of cores.
>>> >>>>>>
>>> >>>>>> thanks a lot for the support!
>>> >>>>>>
>>> >>>>>> saluti,
>>> >>>>>> Stefano
>>> >>>>>>
>>> >>>>>> 2016-03-29 11:51 GMT+02:00 Ufuk Celebi <uc...@apache.org>:
>>> >>>>>>>
>>> >>>>>>> Hey Stefano,
>>> >>>>>>>
>>> >>>>>>> this should work by setting the parallelism on the environment,
>>> e.g.
>>> >>>>>>>
>>> >>>>>>> env.setParallelism(32)
>>> >>>>>>>
>>> >>>>>>> Is this what you are doing?
>>> >>>>>>>
>>> >>>>>>> The task threads are not part of a pool, but each submitted task
>>> >>>>>>> creates its own Thread.
>>> >>>>>>>
>>> >>>>>>> – Ufuk
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On Fri, Mar 25, 2016 at 9:10 PM, Flavio Pompermaier
>>> >>>>>>> <po...@okkam.it> wrote:
>>> >>>>>>> > Any help here? I think that the problem is that the JobManager
>>> >>>>>>> > creates the
>>> >>>>>>> > executionContext of the scheduler with
>>> >>>>>>> >
>>> >>>>>>> >        val executionContext = ExecutionContext.fromExecutor(new
>>> >>>>>>> > ForkJoinPool())
>>> >>>>>>> >
>>> >>>>>>> > and thus the number of concurrently running threads is limited
>>> to
>>> >>>>>>> > the number
>>> >>>>>>> > of cores (using the default constructor of the ForkJoinPool).
>>> >>>>>>> > What do you think?
>>> >>>>>>> >
>>> >>>>>>> >
>>> >>>>>>> > On Wed, Mar 23, 2016 at 6:55 PM, Stefano Bortoli
>>> >>>>>>> > <s....@gmail.com>
>>> >>>>>>> > wrote:
>>> >>>>>>> >>
>>> >>>>>>> >> Hi guys,
>>> >>>>>>> >>
>>> >>>>>>> >> I am trying to test a job that should run a number of tasks to
>>> >>>>>>> >> read from a
>>> >>>>>>> >> RDBMS using an improved JDBC connector. The connection and the
>>> >>>>>>> >> reading run
>>> >>>>>>> >> smoothly, but I cannot seem to be able to move above the
>>> limit of
>>> >>>>>>> >> 8
>>> >>>>>>> >> concurrent threads running. 8 is of course the number of
>>> cores of
>>> >>>>>>> >> my
>>> >>>>>>> >> machine.
>>> >>>>>>> >>
>>> >>>>>>> >> I have tried working around configurations and settings, but
>>> the
>>> >>>>>>> >> Executor
>>> >>>>>>> >> within the ExecutionContext keeps on having a parallelism of
>>> 8.
>>> >>>>>>> >> Although, of
>>> >>>>>>> >> course, the parallelism of the execution environment is much
>>> >>>>>>> >> higher (in fact
>>> >>>>>>> >> I have many more tasks to be allocated).
>>> >>>>>>> >>
>>> >>>>>>> >> I feel it may be an issue of the LocalMiniCluster
>>> configuration
>>> >>>>>>> >> that may
>>> >>>>>>> >> just override/neglect my wish for higher degree of
>>> parallelism. Is
>>> >>>>>>> >> there a
>>> >>>>>>> >> way for me to work around this issue?
>>> >>>>>>> >>
>>> >>>>>>> >> please let me know. Thanks a lot for you help! :-)
>>> >>>>>>> >>
>>> >>>>>>> >> saluti,
>>> >>>>>>> >> Stefano
>>> >>>>>>> >
>>> >>>>>>> >
>>> >>>>>>> >
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>
>>
>