You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Raman Grover (JIRA)" <ji...@apache.org> on 2010/07/09 00:10:57 UTC

[jira] Created: (MAPREDUCE-1928) Dynamic information fed into Hadoop to control execution of a submiited job

Dynamic information fed into Hadoop to control execution of a submiited job
---------------------------------------------------------------------------

                 Key: MAPREDUCE-1928
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
             Project: Hadoop Map/Reduce
          Issue Type: New Feature
            Reporter: Raman Grover


Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
                           The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 

Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 

1) "Process a part of the input , based upon the results decide if reading more input is required " 
    In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
   This is not optimal as:
   (i)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
   (ii) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
        we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
       more input needs to be processed.  
   
   Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
  
II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
     Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
    The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
 

III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
        required further" 
       Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
      now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 

 
 In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 

I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 


-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (MAPREDUCE-1928) Dynamic information fed into Hadoop to control execution of a submiited job

Posted by "Raman Grover (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raman Grover updated MAPREDUCE-1928:
------------------------------------

    Description: 
Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
                           The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 

Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 

1) "Process a part of the input , based upon the results decide if reading more input is required " 
    In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
   This is not optimal as:
   1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
   2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
        we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
       more input needs to be processed.  
   
   Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
  
II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
     Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
    The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
 

III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
        required further" 
       Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
      now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 

 
 In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 

I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 


  was:
Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
                           The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 

Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 

1) "Process a part of the input , based upon the results decide if reading more input is required " 
    In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
   This is not optimal as:
   (i)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
   (ii) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
        we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
       more input needs to be processed.  
   
   Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
  
II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
     Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
    The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
 

III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
        required further" 
       Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
      now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 

 
 In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 

I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 



> Dynamic information fed into Hadoop to control execution of a submiited job
> ---------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 1680h
>  Remaining Estimate: 1680h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> 1) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>   
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
>  In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (MAPREDUCE-1928) Dynamic information fed into Hadoop for controlling execution of a submitted job

Posted by "Joydeep Sen Sarma (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12888503#action_12888503 ] 

Joydeep Sen Sarma commented on MAPREDUCE-1928:
----------------------------------------------

to add to #1 - we may be able to change the split size based on the observed selectivity of an ongoing job (ie. add splits with larger/smaller size depending on stats from the first set of splits). It's possible that Hadoop may want to do this as part of the basic framework (by exploiting any mechanisms provided here).

This is a huge win for a framework like Hive. It would drastically reduce the amount of wasted work (limit N queries) and spawning unnecessarily large number of mappers (unknown selectivity) - just to name to obvious use cases. 

Can you supply a more concrete proposal in terms of api changes?

> Dynamic information fed into Hadoop for controlling execution of a submitted job
> --------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 2016h
>  Remaining Estimate: 2016h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> I) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>  
>  
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
> In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (MAPREDUCE-1928) Dynamic information fed into Hadoop to control execution of a submiited job

Posted by "Raman Grover (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raman Grover updated MAPREDUCE-1928:
------------------------------------

     Original Estimate: 2016h  (was: 1680h)
    Remaining Estimate: 2016h  (was: 1680h)

> Dynamic information fed into Hadoop to control execution of a submiited job
> ---------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 2016h
>  Remaining Estimate: 2016h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> I) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>  
>  
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
> In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (MAPREDUCE-1928) Dynamic information fed into Hadoop for controlling execution of a submitted job

Posted by "Ted Yu (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12886853#action_12886853 ] 

Ted Yu commented on MAPREDUCE-1928:
-----------------------------------

Case 2 is related to MAPREDUCE-1849
One possibility is to combine the two MapReduces into one during the optimization step.

> Dynamic information fed into Hadoop for controlling execution of a submitted job
> --------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 2016h
>  Remaining Estimate: 2016h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> I) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>  
>  
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
> In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Commented: (MAPREDUCE-1928) Dynamic information fed into Hadoop for controlling execution of a submitted job

Posted by "Steven Lewis (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12888332#action_12888332 ] 

Steven Lewis commented on MAPREDUCE-1928:
-----------------------------------------

Another possible use has to do with adjusting parameters to avoid failures. I have an issue where a reducer is running out of memory. If I was aware that 
certain  keys lead to this failure I could take steps such as sampling data rather than processing the whole set do I would add access to data about failures

> Dynamic information fed into Hadoop for controlling execution of a submitted job
> --------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 2016h
>  Remaining Estimate: 2016h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> I) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>  
>  
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
> In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (MAPREDUCE-1928) Dynamic information fed into Hadoop for controlling execution of a submitted job

Posted by "Raman Grover (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raman Grover updated MAPREDUCE-1928:
------------------------------------

    Summary: Dynamic information fed into Hadoop for controlling execution of a submitted job  (was: Dynamic information fed into Hadoop to control execution of a submiited job)

> Dynamic information fed into Hadoop for controlling execution of a submitted job
> --------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 2016h
>  Remaining Estimate: 2016h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> I) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>  
>  
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
> In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (MAPREDUCE-1928) Dynamic information fed into Hadoop to control execution of a submiited job

Posted by "Raman Grover (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raman Grover updated MAPREDUCE-1928:
------------------------------------

    Description: 
Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
                           The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 

Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 

I) "Process a part of the input , based upon the results decide if reading more input is required " 
    In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
   This is not optimal as:
   1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
   2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
        we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
       more input needs to be processed.  
   
   Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
 
 
II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
     Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
    The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
 

III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
        required further" 
       Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
      now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 

 
In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 

I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 


  was:
Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
                           The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 

Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 

1) "Process a part of the input , based upon the results decide if reading more input is required " 
    In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
   This is not optimal as:
   1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
   2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
        we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
       more input needs to be processed.  
   
   Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
  
II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
     Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
    The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
 

III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
        required further" 
       Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
      now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 

 
 In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 

I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 



> Dynamic information fed into Hadoop to control execution of a submiited job
> ---------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 1680h
>  Remaining Estimate: 1680h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> I) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    1)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    2) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>  
>  
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
> In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (MAPREDUCE-1928) Dynamic information fed into Hadoop to control execution of a submiited job

Posted by "Raman Grover (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/MAPREDUCE-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Raman Grover updated MAPREDUCE-1928:
------------------------------------

     Original Estimate: 1680h  (was: 2016h)
    Remaining Estimate: 1680h  (was: 2016h)
     Affects Version/s: 0.20.3
           Component/s: job submission
                        jobtracker
                        tasktracker

> Dynamic information fed into Hadoop to control execution of a submiited job
> ---------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-1928
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-1928
>             Project: Hadoop Map/Reduce
>          Issue Type: New Feature
>          Components: job submission, jobtracker, tasktracker
>    Affects Versions: 0.20.3
>            Reporter: Raman Grover
>   Original Estimate: 1680h
>  Remaining Estimate: 1680h
>
> Currently the job submission protocol requires the job provider to put every bit of information inside an instance of JobConf. The submitted information includes the input data (hdfs path) , suspected resource requirement, number of reducers etc.  This information is read by JobTracker as part of job initialization. Once initialized, job is moved into a running state. From this point, there is no mechanism for any additional information to be fed into Hadoop infrastructure for controlling the job execution. 
>                            The execution pattern for the job looks very much static from this point. Using the size of input data and a few settings inside JobConf, number of mappers is computed. Hadoop attempts at reading the whole of data in parallel by launching parallel map tasks. Once map phase is over, a known number of reduce tasks (supplied as part of  JobConf) are started. 
> Parameters that control the job execution were set in JobConf prior to reading the input data. As the map phase progresses, useful information based upon the content of the input data surfaces and can be used in controlling the further execution of the job. Let us walk through some of the examples where additional information can be fed to Hadoop subsequent to job submission for optimal execution of the job. 
> 1) "Process a part of the input , based upon the results decide if reading more input is required " 
>     In a huge data set, user is interested in finding 'k' records that satisfy a predicate, essentially sampling the data. In current implementation, as the data is huge, a large no of mappers would be launched consuming a significant fraction of the available map slots in the cluster. Each map task would attempt at emitting a max of  'k' records. With N mappers, we get N*k records out of which one can pick any k to form the final result. 
>    This is not optimal as:
>    (i)  A larger number of map slots get occupied initially, affecting other jobs in the queue. 
>    (ii) If the selectivity of input data is very low, we essentially did not need scanning the whole of data to form our result. 
>         we could have finished by reading a fraction of input data, monitoring the cardinality of the map output and determining if 
>        more input needs to be processed.  
>    
>    Optimal way: If reading the whole of input requires N mappers, launch only 'M' initially. Allow them to complete. Based upon the statistics collected, decide additional number of mappers to be launched next and so on until the whole of input has been processed or enough records have been collected to for the results, whichever is earlier. 
>   
> II)  "Here is some data, the remaining is yet to arrive, but you may start with it, and receive more input later"
>      Consider a chain of 2 M-R jobs chained together such that the latter reads the output of the former. The second MR job cannot be started until the first has finished completely. This is essentially because Hadoop needs to be told the complete information about the input before beginning the job. 
>     The first M-R has produced enough data ( not finished yet) that can be processed by another MR job and hence the other MR need not wait to grab the whole of input before beginning.  Input splits could be supplied later , but ofcourse before the copy/shuffle phase.
>  
> III)  " Input data has undergone one round of processing by map phase, have some stats, can now say of the resources 
>         required further" 
>        Mappers can produce useful stats about of their output, like the cardinality or produce a histogram describing distribution of output . These stats are available to the job provider (Hive/Pig/End User) who can 
>       now determine with better accuracy of the resources (memory requirements ) required in reduction phase,  and even the number    of  reducers or may even alter the reduction logic by altering the reducer class parameter. 
>  
>  In a nut shell, certain parameters about a job are governed by the input data and  the intermediate results produced and hence need to be overridden as job progresses. Hadoop does not allow such information to be fed dynamically. Hence job execution may not always be optimal. 
> I would like to get feedback from the Hadoop community about the above proposal and if any similar effort is already underway. 
> If we agree, as a next step I would like to discuss the implementation details that I have worked out end-to-end. 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.