You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Ricky Ho <rh...@adobe.com> on 2008/12/01 19:09:51 UTC

Hadoop Internal Architecture writeup (failure handling)

More questions on component failure handling.  Can anyone confirm (or correct) that ?

1) When a TaskTracker crashes, the JobTracker haven't heard its heartbeat after a timeout period will conclude its crashes and re-allocate the unfinished task to other tasktrackers.  Correct ?

2) If the original TaskTracker is just overloaded rather than crash, what if it submit the Task result afterwards ?

3) When a DataNode crashes, the NameNode haven't heard its heartbeat after a timeout period will conclude its crashes.  The NameNode will gradually redistribute the data chunks of the failed DataNode to other DataNodes to comply with the replication factors.  Correct ?

4) Now what if the crashed DataNode reboot after an hour and rejoin the cluster.  It will report to the NameNode which chunks it has.  How does the NameNode recognize which chunks are outdated ?  Using some kind of version number ?

5) After NameNode detects certain chunks are outdated, does NameNode simply discard them (and garbage collect them later) or does it try to bring them up to date (e.g. send it all the deltas) ?

6) When a NameNode crashes, all HDFS write cannot proceed.  But HDFS reads can proceed if the client has a chunk handle to the DataNode.  Correct ?  This situation will continue until the NameNode is recovered and things will be back to normal.  Correct ?

7) When a JobTracker crashes, all the Job that hasn't been completed will all be discarded.  When the JobTracker restarted, the client need to resubmit all the jobs again.  Correct ?

Rgds,
Ricky

-----Original Message-----
From: Amar Kamat [mailto:amarrk@yahoo-inc.com]
Sent: Sunday, November 30, 2008 8:02 PM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Hey, nice work and nice writeup. Keep it up.
Comments inline.
Amar


-----Original Message-----
From: Ricky Ho [mailto:rho@adobe.com]
Sent: Fri 11/28/2008 9:45 AM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Amar, thanks a lot.  This is exactly the kind of feedback that I am looking for ...  I have some more question ...

==================
The jobclient while submitting the job calculates the split using
InputFormat which is specified by the user. Internally the InputFormat
might make use of dfs-block size, user-hinted num-maps etc. The
jobtracker is given 3 files
- job.xml : job control parameters
- job.split : the split file
- job.jar : user map-reduce code
==================
[Ricky]  What exactly does the job.split contains ?  I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file.  Correct ?

====================
This process is interleaved/parallelized. As soon as a map is done, the
JobTracker is notified. Once a tracker (with a reducer) asks for events,
these new events are passed. Hence the map output pulling (Shuffle
Phase) works in parallel with the Map Phase. Reduce Phase can start only
once all the (resp) map outputs are copied and merged.
=====================
[Ricky]  I am curious about why can't the reduce execution start earlier (before all the map tasks completed).  The value "iterator" inside the used-defined reduce() method can be blocked to wait for more map tasks completion.  In other words, the map() and reduce() can also be proceeding in a pipeline parallelism.


======================
There is a 1-1 mapping between a split and a map task. Hence it will
re-run the map on the corresponding split.
======================
[Ricky]  Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ?

comment:
If the job has 5000 splits, then it requires 5000 VMs (one for each split). TaskTracker is a framework daemon. TaskTracker is a process (JVM) that handles/manages tasks (processes processing a split) on a node. A TaskTracker is recognized by (node-hostname + port). A task is never executed in a TaskTracker and new jvm is spawned. The reason being that a faulty usercode(map/reduce) should not bring down a TaskTracker (a framework process). But with hadoop-0.19 we have jvm reuse and hence 5000 splits might require < 5000 VMs. Note that tasks in the end might get speculated which might add to the VM count.
Amar


=======================
The client is unblocked once the job is submitted. The way it works is
as follows :
- jobclient requests the jobtracker for a unique job id
- jobclient does some sanity checks to see if the output folder exists
etc ...
- jobclient uploads job files (xml, jar, split) onto a known location
called System-Directory
========================
[Ricky]  Is this a well-know folder within the HDFS ?

This is set using "mapred.system.dir" during cluster startup (see hadoop-default.conf). Its a framework directory.



RE: Hadoop Internal Architecture writeup (failure handling)

Posted by Amar Kamat <am...@yahoo-inc.com>.
@Ques-7 : mapred.jobtracker.restart.recover is the parameter that enables the feature for hadoop versions > 0.19. mapred.jobtracker.job.history.block.size controls the extent of recovery.
Amar

-----Original Message-----
From: Amar Kamat [mailto:amarrk@yahoo-inc.com]
Sent: Mon 12/1/2008 12:25 PM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup (failure handling)
 
Comments inline.
Amar


-----Original Message-----
From: Ricky Ho [mailto:rho@adobe.com]
Sent: Mon 12/1/2008 10:09 AM
To: core-user@hadoop.apache.org
Subject: Hadoop Internal Architecture writeup (failure handling)
 
More questions on component failure handling.  Can anyone confirm (or correct) that ?

1) When a TaskTracker crashes, the JobTracker haven't heard its heartbeat after a timeout period will conclude its crashes and re-allocate the unfinished task to other tasktrackers.  Correct ?

All the tasks (complete + incomplete) from running jobs with their output on local filesystem will be re-executed. Only incomplete tasks from running jobs with their output on HDFS will be re-executed. Its also okay to say that maps from running jobs with num-reducers > 0 will be re-executed (completed + incomplete).

2) If the original TaskTracker is just overloaded rather than crash, what if it submit the Task result afterwards ?
Always the task (i.e the attempt) that reports first will be considered and other attempts will be killed.

3) When a DataNode crashes, the NameNode haven't heard its heartbeat after a timeout period will conclude its crashes.  The NameNode will gradually redistribute the data chunks of the failed DataNode to other DataNodes to comply with the replication factors.  Correct ?

4) Now what if the crashed DataNode reboot after an hour and rejoin the cluster.  It will report to the NameNode which chunks it has.  How does the NameNode recognize which chunks are outdated ?  Using some kind of version number ?

5) After NameNode detects certain chunks are outdated, does NameNode simply discard them (and garbage collect them later) or does it try to bring them up to date (e.g. send it all the deltas) ?

6) When a NameNode crashes, all HDFS write cannot proceed.  But HDFS reads can proceed if the client has a chunk handle to the DataNode.  Correct ?  This situation will continue until the NameNode is recovered and things will be back to normal.  Correct ?

7) When a JobTracker crashes, all the Job that hasn't been completed will all be discarded.  When the JobTracker restarted, the client need to resubmit all the jobs again.  Correct ?
Yes. This is true for hadoop versions < 0.19. With hadoop-0.19 we have a fault tolerant jobtracker. So all the submitted jobs will be recovered and running jobs will be resumed. The extent of recovery might not be 100%.

Rgds,
Ricky

-----Original Message-----
From: Amar Kamat [mailto:amarrk@yahoo-inc.com]
Sent: Sunday, November 30, 2008 8:02 PM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Hey, nice work and nice writeup. Keep it up.
Comments inline.
Amar


-----Original Message-----
From: Ricky Ho [mailto:rho@adobe.com]
Sent: Fri 11/28/2008 9:45 AM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Amar, thanks a lot.  This is exactly the kind of feedback that I am looking for ...  I have some more question ...

==================
The jobclient while submitting the job calculates the split using
InputFormat which is specified by the user. Internally the InputFormat
might make use of dfs-block size, user-hinted num-maps etc. The
jobtracker is given 3 files
- job.xml : job control parameters
- job.split : the split file
- job.jar : user map-reduce code
==================
[Ricky]  What exactly does the job.split contains ?  I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file.  Correct ?

====================
This process is interleaved/parallelized. As soon as a map is done, the
JobTracker is notified. Once a tracker (with a reducer) asks for events,
these new events are passed. Hence the map output pulling (Shuffle
Phase) works in parallel with the Map Phase. Reduce Phase can start only
once all the (resp) map outputs are copied and merged.
=====================
[Ricky]  I am curious about why can't the reduce execution start earlier (before all the map tasks completed).  The value "iterator" inside the used-defined reduce() method can be blocked to wait for more map tasks completion.  In other words, the map() and reduce() can also be proceeding in a pipeline parallelism.


======================
There is a 1-1 mapping between a split and a map task. Hence it will
re-run the map on the corresponding split.
======================
[Ricky]  Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ?

comment:
If the job has 5000 splits, then it requires 5000 VMs (one for each split). TaskTracker is a framework daemon. TaskTracker is a process (JVM) that handles/manages tasks (processes processing a split) on a node. A TaskTracker is recognized by (node-hostname + port). A task is never executed in a TaskTracker and new jvm is spawned. The reason being that a faulty usercode(map/reduce) should not bring down a TaskTracker (a framework process). But with hadoop-0.19 we have jvm reuse and hence 5000 splits might require < 5000 VMs. Note that tasks in the end might get speculated which might add to the VM count.
Amar


=======================
The client is unblocked once the job is submitted. The way it works is
as follows :
- jobclient requests the jobtracker for a unique job id
- jobclient does some sanity checks to see if the output folder exists
etc ...
- jobclient uploads job files (xml, jar, split) onto a known location
called System-Directory
========================
[Ricky]  Is this a well-know folder within the HDFS ?

This is set using "mapred.system.dir" during cluster startup (see hadoop-default.conf). Its a framework directory.





RE: Hadoop Internal Architecture writeup (failure handling)

Posted by Amar Kamat <am...@yahoo-inc.com>.
Comments inline.
Amar


-----Original Message-----
From: Ricky Ho [mailto:rho@adobe.com]
Sent: Mon 12/1/2008 10:09 AM
To: core-user@hadoop.apache.org
Subject: Hadoop Internal Architecture writeup (failure handling)
 
More questions on component failure handling.  Can anyone confirm (or correct) that ?

1) When a TaskTracker crashes, the JobTracker haven't heard its heartbeat after a timeout period will conclude its crashes and re-allocate the unfinished task to other tasktrackers.  Correct ?

All the tasks (complete + incomplete) from running jobs with their output on local filesystem will be re-executed. Only incomplete tasks from running jobs with their output on HDFS will be re-executed. Its also okay to say that maps from running jobs with num-reducers > 0 will be re-executed (completed + incomplete).

2) If the original TaskTracker is just overloaded rather than crash, what if it submit the Task result afterwards ?
Always the task (i.e the attempt) that reports first will be considered and other attempts will be killed.

3) When a DataNode crashes, the NameNode haven't heard its heartbeat after a timeout period will conclude its crashes.  The NameNode will gradually redistribute the data chunks of the failed DataNode to other DataNodes to comply with the replication factors.  Correct ?

4) Now what if the crashed DataNode reboot after an hour and rejoin the cluster.  It will report to the NameNode which chunks it has.  How does the NameNode recognize which chunks are outdated ?  Using some kind of version number ?

5) After NameNode detects certain chunks are outdated, does NameNode simply discard them (and garbage collect them later) or does it try to bring them up to date (e.g. send it all the deltas) ?

6) When a NameNode crashes, all HDFS write cannot proceed.  But HDFS reads can proceed if the client has a chunk handle to the DataNode.  Correct ?  This situation will continue until the NameNode is recovered and things will be back to normal.  Correct ?

7) When a JobTracker crashes, all the Job that hasn't been completed will all be discarded.  When the JobTracker restarted, the client need to resubmit all the jobs again.  Correct ?
Yes. This is true for hadoop versions < 0.19. With hadoop-0.19 we have a fault tolerant jobtracker. So all the submitted jobs will be recovered and running jobs will be resumed. The extent of recovery might not be 100%.

Rgds,
Ricky

-----Original Message-----
From: Amar Kamat [mailto:amarrk@yahoo-inc.com]
Sent: Sunday, November 30, 2008 8:02 PM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Hey, nice work and nice writeup. Keep it up.
Comments inline.
Amar


-----Original Message-----
From: Ricky Ho [mailto:rho@adobe.com]
Sent: Fri 11/28/2008 9:45 AM
To: core-user@hadoop.apache.org
Subject: RE: Hadoop Internal Architecture writeup

Amar, thanks a lot.  This is exactly the kind of feedback that I am looking for ...  I have some more question ...

==================
The jobclient while submitting the job calculates the split using
InputFormat which is specified by the user. Internally the InputFormat
might make use of dfs-block size, user-hinted num-maps etc. The
jobtracker is given 3 files
- job.xml : job control parameters
- job.split : the split file
- job.jar : user map-reduce code
==================
[Ricky]  What exactly does the job.split contains ?  I assume it contains the specification for each split (but not its data), such as what is the corresponding file and the byte range within that file.  Correct ?

====================
This process is interleaved/parallelized. As soon as a map is done, the
JobTracker is notified. Once a tracker (with a reducer) asks for events,
these new events are passed. Hence the map output pulling (Shuffle
Phase) works in parallel with the Map Phase. Reduce Phase can start only
once all the (resp) map outputs are copied and merged.
=====================
[Ricky]  I am curious about why can't the reduce execution start earlier (before all the map tasks completed).  The value "iterator" inside the used-defined reduce() method can be blocked to wait for more map tasks completion.  In other words, the map() and reduce() can also be proceeding in a pipeline parallelism.


======================
There is a 1-1 mapping between a split and a map task. Hence it will
re-run the map on the corresponding split.
======================
[Ricky]  Do you mean if the job has 5000 splits, then it requires 5000 TaskTrackers VM (one for each split) ?

comment:
If the job has 5000 splits, then it requires 5000 VMs (one for each split). TaskTracker is a framework daemon. TaskTracker is a process (JVM) that handles/manages tasks (processes processing a split) on a node. A TaskTracker is recognized by (node-hostname + port). A task is never executed in a TaskTracker and new jvm is spawned. The reason being that a faulty usercode(map/reduce) should not bring down a TaskTracker (a framework process). But with hadoop-0.19 we have jvm reuse and hence 5000 splits might require < 5000 VMs. Note that tasks in the end might get speculated which might add to the VM count.
Amar


=======================
The client is unblocked once the job is submitted. The way it works is
as follows :
- jobclient requests the jobtracker for a unique job id
- jobclient does some sanity checks to see if the output folder exists
etc ...
- jobclient uploads job files (xml, jar, split) onto a known location
called System-Directory
========================
[Ricky]  Is this a well-know folder within the HDFS ?

This is set using "mapred.system.dir" during cluster startup (see hadoop-default.conf). Its a framework directory.