You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-dev@hadoop.apache.org by "Sirianni, Eric" <Er...@netapp.com> on 2013/08/15 15:21:13 UTC

mapred replication

In debugging some replication issues in our HDFS environment, I noticed that the MapReduce framework uses the following algorithm for setting the replication on submitted job files:

1.     Create the file with *default* DFS replication factor (i.e. 'dfs.replication')

2.     Subsequently alter the replication of the file based on the 'mapred.submit.replication' config value

  private static FSDataOutputStream createFile(FileSystem fs, Path splitFile,
      Configuration job)  throws IOException {
    FSDataOutputStream out = FileSystem.create(fs, splitFile,
        new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
    int replication = job.getInt("mapred.submit.replication", 10);
    fs.setReplication(splitFile, (short)replication);
    writeSplitHeader(out);
    return out;
  }

If I understand currectly, the net functional effect of this approach is that

-       The initial write pipeline is setup with 'dfs.replication' nodes (i.e. 3)

-       The namenode triggers additional inter-datanode replications in the background (as it detects the blocks as "under-replicated").

I'm assuming this is intentional?  Alternatively, if the mapred.submit.replication was specified on initial create, the write pipeline would be significantly larger.

The reason I noticed is that we had inadvertently specified mapred.submit.replication as *less than* dfs.replication in our configuration, which caused a bunch of excess replica pruning (and ultimately IOExceptions in our datanode logs).

Thanks,
Eric


RE: mapred replication

Posted by "Sirianni, Eric" <Er...@netapp.com>.
I should have mentioned that the IOException is from the DataNode (if not obvious from the stacktrace :)).  The DFSClient continues along happily by fetching the block from a different replica (as one would expect).

-----Original Message-----
From: Sirianni, Eric [mailto:Eric.Sirianni@netapp.com] 
Sent: Monday, August 19, 2013 10:37 AM
To: hdfs-dev@hadoop.apache.org
Subject: RE: mapred replication

Thanks Chris and others for the detailed explanation.

I was aware of the basic rationale behind having a higher replication factor for mapred job files - thanks taking the time to elaborate and share with those on this list.

After thinking about it a bit more offline, I too speculated that the choice of altering the rep factor post file-create was to limit the size of the client write pipeline and use background inter-datanode replication to create the additional replicas.  Thanks for confirming that intuition.

Regarding the IOExceptions, here is the stack trace in question:

WARN org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(10.63.150.50:50010, id=DS-1611001133, infoPort=50075, ipcPort=50020):Got exception while serving blk_7363978388743975861_1030 to /10.63.150.49:
java.io.IOException: Block blk_7363978388743975861_1030 is not valid.
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getBlockFile(FSDataset.java:1059)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getLength(FSDataset.java:1022)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getVisibleLength(FSDataset.java:1032)
        at org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:115)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:194)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:104)
        at java.lang.Thread.run(Thread.java:679)

I also speculated that this is due to the way that invalidates are processed asynchronously at the namenode.  A quick look at the chooseExcessReplicates()->addToInvalidates() path seems to indicate that the NameNode does not actually remove the pruned replica from the BlocksMap until the subsequent blockReport is received.  This can leave a substantial window where the NameNode can return bogus replica locations to clients.

There is another code path FSNamesystem.invalidateBlock() that does proactively update the BlocksMap (via FSNamesystem.removeStoredBlock()) after updating the recentInvalidateSets.  Perhaps the excess replica pruning path should include such a blockmap update as well?  Or even better, just push the common BlocksMap removal into the addToInvalidates() method to all callers get that behavior.  

Maybe I'm missing something - is there a legitimate reason for the NameNode to keep a replica's metadata in the BlocksMap after it has already decided to invalidate said replica?

Eric


-----Original Message-----
From: Robert Evans [mailto:evans@yahoo-inc.com] 
Sent: Monday, August 19, 2013 10:11 AM
To: hdfs-dev@hadoop.apache.org
Subject: Re: mapred replication

Without the stack trace of the exceptions it is hard to tell.  The pruning
is asynchronous, but so is a node crashing with a replica on it.  The
client is supposed to detect this situation and find a new replica that
works.  I am not that familiar with the code, but I believe in some if not
all of these cases it will log the exception to indicate that something
bad happened, but it recovered.

--Bobby

On 8/16/13 4:40 PM, "Jay Vyas" <ja...@gmail.com> wrote:

>Why should this lead to an IOException?  Is it because the pruning of
>replicas is asynchronous and the datanodes try to access nonexistent
>files?  If so that seems like a pretty major bug
>
>
>On Fri, Aug 16, 2013 at 5:21 PM, Chris Nauroth
><cn...@hortonworks.com>wrote:
>
>> Hi Eric,
>>
>> Yes, this is intentional.  The job.xml file and the job jar file get
>>read
>> from every node running a map or reduce task.  Because of this, using a
>> higher than normal replication factor on these files improves locality.
>>  More than 3 task slots will have access to local replicas.  These files
>> tend to be much smaller than the actual data read by a job, so there
>>tends
>> to be little harm done in terms of disk space consumption.
>>
>> Why not create the file initially with 10 replicas instead of creating
>>it
>> with 3 and then dialing up?  I imagine this is so that job submission
>> doesn't block on a synchronous write to a long pipeline.  The extra
>> replicas aren't necessary for correctness, and a long-running job will
>>get
>> the locality benefits in the long term once more replicas are created in
>> the background.
>>
>> I recommend submitting a new jira describing the problem that you saw.
>>We
>> probably can handle this better, and a jira would be a good place to
>> discuss the trade-offs.  A few possibilities:
>>
>> Log a warning if mapred.submit.replication < dfs.replication.
>> Skip resetting replication if mapred.submit.replication <=
>>dfs.replication.
>> Fail with error if mapred.submit.replication < dfs.replication.
>>
>> Chris Nauroth
>> Hortonworks
>> http://hortonworks.com/
>>
>>
>>
>> On Thu, Aug 15, 2013 at 6:21 AM, Sirianni, Eric
>><Eric.Sirianni@netapp.com
>> >wrote:
>>
>> > In debugging some replication issues in our HDFS environment, I
>>noticed
>> > that the MapReduce framework uses the following algorithm for setting
>>the
>> > replication on submitted job files:
>> >
>> > 1.     Create the file with *default* DFS replication factor (i.e.
>> > 'dfs.replication')
>> >
>> > 2.     Subsequently alter the replication of the file based on the
>> > 'mapred.submit.replication' config value
>> >
>> >   private static FSDataOutputStream createFile(FileSystem fs, Path
>> > splitFile,
>> >       Configuration job)  throws IOException {
>> >     FSDataOutputStream out = FileSystem.create(fs, splitFile,
>> >         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
>> >     int replication = job.getInt("mapred.submit.replication", 10);
>> >     fs.setReplication(splitFile, (short)replication);
>> >     writeSplitHeader(out);
>> >     return out;
>> >   }
>> >
>> > If I understand currectly, the net functional effect of this approach
>>is
>> > that
>> >
>> > -       The initial write pipeline is setup with 'dfs.replication'
>>nodes
>> > (i.e. 3)
>> >
>> > -       The namenode triggers additional inter-datanode replications
>>in
>> > the background (as it detects the blocks as "under-replicated").
>> >
>> > I'm assuming this is intentional?  Alternatively, if the
>> > mapred.submit.replication was specified on initial create, the write
>> > pipeline would be significantly larger.
>> >
>> > The reason I noticed is that we had inadvertently specified
>> > mapred.submit.replication as *less than* dfs.replication in our
>> > configuration, which caused a bunch of excess replica pruning (and
>> > ultimately IOExceptions in our datanode logs).
>> >
>> > Thanks,
>> > Eric
>> >
>> >
>>
>> --
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or
>>entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the
>>reader
>> of this message is not the intended recipient, you are hereby notified
>>that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender
>>immediately
>> and delete it from your system. Thank You.
>>
>
>
>
>-- 
>Jay Vyas
>http://jayunit100.blogspot.com


RE: mapred replication

Posted by "Sirianni, Eric" <Er...@netapp.com>.
Thanks Chris and others for the detailed explanation.

I was aware of the basic rationale behind having a higher replication factor for mapred job files - thanks taking the time to elaborate and share with those on this list.

After thinking about it a bit more offline, I too speculated that the choice of altering the rep factor post file-create was to limit the size of the client write pipeline and use background inter-datanode replication to create the additional replicas.  Thanks for confirming that intuition.

Regarding the IOExceptions, here is the stack trace in question:

WARN org.apache.hadoop.hdfs.server.datanode.DataNode: DatanodeRegistration(10.63.150.50:50010, id=DS-1611001133, infoPort=50075, ipcPort=50020):Got exception while serving blk_7363978388743975861_1030 to /10.63.150.49:
java.io.IOException: Block blk_7363978388743975861_1030 is not valid.
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getBlockFile(FSDataset.java:1059)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getLength(FSDataset.java:1022)
        at org.apache.hadoop.hdfs.server.datanode.FSDataset.getVisibleLength(FSDataset.java:1032)
        at org.apache.hadoop.hdfs.server.datanode.BlockSender.<init>(BlockSender.java:115)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:194)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:104)
        at java.lang.Thread.run(Thread.java:679)

I also speculated that this is due to the way that invalidates are processed asynchronously at the namenode.  A quick look at the chooseExcessReplicates()->addToInvalidates() path seems to indicate that the NameNode does not actually remove the pruned replica from the BlocksMap until the subsequent blockReport is received.  This can leave a substantial window where the NameNode can return bogus replica locations to clients.

There is another code path FSNamesystem.invalidateBlock() that does proactively update the BlocksMap (via FSNamesystem.removeStoredBlock()) after updating the recentInvalidateSets.  Perhaps the excess replica pruning path should include such a blockmap update as well?  Or even better, just push the common BlocksMap removal into the addToInvalidates() method to all callers get that behavior.  

Maybe I'm missing something - is there a legitimate reason for the NameNode to keep a replica's metadata in the BlocksMap after it has already decided to invalidate said replica?

Eric


-----Original Message-----
From: Robert Evans [mailto:evans@yahoo-inc.com] 
Sent: Monday, August 19, 2013 10:11 AM
To: hdfs-dev@hadoop.apache.org
Subject: Re: mapred replication

Without the stack trace of the exceptions it is hard to tell.  The pruning
is asynchronous, but so is a node crashing with a replica on it.  The
client is supposed to detect this situation and find a new replica that
works.  I am not that familiar with the code, but I believe in some if not
all of these cases it will log the exception to indicate that something
bad happened, but it recovered.

--Bobby

On 8/16/13 4:40 PM, "Jay Vyas" <ja...@gmail.com> wrote:

>Why should this lead to an IOException?  Is it because the pruning of
>replicas is asynchronous and the datanodes try to access nonexistent
>files?  If so that seems like a pretty major bug
>
>
>On Fri, Aug 16, 2013 at 5:21 PM, Chris Nauroth
><cn...@hortonworks.com>wrote:
>
>> Hi Eric,
>>
>> Yes, this is intentional.  The job.xml file and the job jar file get
>>read
>> from every node running a map or reduce task.  Because of this, using a
>> higher than normal replication factor on these files improves locality.
>>  More than 3 task slots will have access to local replicas.  These files
>> tend to be much smaller than the actual data read by a job, so there
>>tends
>> to be little harm done in terms of disk space consumption.
>>
>> Why not create the file initially with 10 replicas instead of creating
>>it
>> with 3 and then dialing up?  I imagine this is so that job submission
>> doesn't block on a synchronous write to a long pipeline.  The extra
>> replicas aren't necessary for correctness, and a long-running job will
>>get
>> the locality benefits in the long term once more replicas are created in
>> the background.
>>
>> I recommend submitting a new jira describing the problem that you saw.
>>We
>> probably can handle this better, and a jira would be a good place to
>> discuss the trade-offs.  A few possibilities:
>>
>> Log a warning if mapred.submit.replication < dfs.replication.
>> Skip resetting replication if mapred.submit.replication <=
>>dfs.replication.
>> Fail with error if mapred.submit.replication < dfs.replication.
>>
>> Chris Nauroth
>> Hortonworks
>> http://hortonworks.com/
>>
>>
>>
>> On Thu, Aug 15, 2013 at 6:21 AM, Sirianni, Eric
>><Eric.Sirianni@netapp.com
>> >wrote:
>>
>> > In debugging some replication issues in our HDFS environment, I
>>noticed
>> > that the MapReduce framework uses the following algorithm for setting
>>the
>> > replication on submitted job files:
>> >
>> > 1.     Create the file with *default* DFS replication factor (i.e.
>> > 'dfs.replication')
>> >
>> > 2.     Subsequently alter the replication of the file based on the
>> > 'mapred.submit.replication' config value
>> >
>> >   private static FSDataOutputStream createFile(FileSystem fs, Path
>> > splitFile,
>> >       Configuration job)  throws IOException {
>> >     FSDataOutputStream out = FileSystem.create(fs, splitFile,
>> >         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
>> >     int replication = job.getInt("mapred.submit.replication", 10);
>> >     fs.setReplication(splitFile, (short)replication);
>> >     writeSplitHeader(out);
>> >     return out;
>> >   }
>> >
>> > If I understand currectly, the net functional effect of this approach
>>is
>> > that
>> >
>> > -       The initial write pipeline is setup with 'dfs.replication'
>>nodes
>> > (i.e. 3)
>> >
>> > -       The namenode triggers additional inter-datanode replications
>>in
>> > the background (as it detects the blocks as "under-replicated").
>> >
>> > I'm assuming this is intentional?  Alternatively, if the
>> > mapred.submit.replication was specified on initial create, the write
>> > pipeline would be significantly larger.
>> >
>> > The reason I noticed is that we had inadvertently specified
>> > mapred.submit.replication as *less than* dfs.replication in our
>> > configuration, which caused a bunch of excess replica pruning (and
>> > ultimately IOExceptions in our datanode logs).
>> >
>> > Thanks,
>> > Eric
>> >
>> >
>>
>> --
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or
>>entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the
>>reader
>> of this message is not the intended recipient, you are hereby notified
>>that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender
>>immediately
>> and delete it from your system. Thank You.
>>
>
>
>
>-- 
>Jay Vyas
>http://jayunit100.blogspot.com


Re: mapred replication

Posted by Robert Evans <ev...@yahoo-inc.com>.
Without the stack trace of the exceptions it is hard to tell.  The pruning
is asynchronous, but so is a node crashing with a replica on it.  The
client is supposed to detect this situation and find a new replica that
works.  I am not that familiar with the code, but I believe in some if not
all of these cases it will log the exception to indicate that something
bad happened, but it recovered.

--Bobby

On 8/16/13 4:40 PM, "Jay Vyas" <ja...@gmail.com> wrote:

>Why should this lead to an IOException?  Is it because the pruning of
>replicas is asynchronous and the datanodes try to access nonexistent
>files?  If so that seems like a pretty major bug
>
>
>On Fri, Aug 16, 2013 at 5:21 PM, Chris Nauroth
><cn...@hortonworks.com>wrote:
>
>> Hi Eric,
>>
>> Yes, this is intentional.  The job.xml file and the job jar file get
>>read
>> from every node running a map or reduce task.  Because of this, using a
>> higher than normal replication factor on these files improves locality.
>>  More than 3 task slots will have access to local replicas.  These files
>> tend to be much smaller than the actual data read by a job, so there
>>tends
>> to be little harm done in terms of disk space consumption.
>>
>> Why not create the file initially with 10 replicas instead of creating
>>it
>> with 3 and then dialing up?  I imagine this is so that job submission
>> doesn't block on a synchronous write to a long pipeline.  The extra
>> replicas aren't necessary for correctness, and a long-running job will
>>get
>> the locality benefits in the long term once more replicas are created in
>> the background.
>>
>> I recommend submitting a new jira describing the problem that you saw.
>>We
>> probably can handle this better, and a jira would be a good place to
>> discuss the trade-offs.  A few possibilities:
>>
>> Log a warning if mapred.submit.replication < dfs.replication.
>> Skip resetting replication if mapred.submit.replication <=
>>dfs.replication.
>> Fail with error if mapred.submit.replication < dfs.replication.
>>
>> Chris Nauroth
>> Hortonworks
>> http://hortonworks.com/
>>
>>
>>
>> On Thu, Aug 15, 2013 at 6:21 AM, Sirianni, Eric
>><Eric.Sirianni@netapp.com
>> >wrote:
>>
>> > In debugging some replication issues in our HDFS environment, I
>>noticed
>> > that the MapReduce framework uses the following algorithm for setting
>>the
>> > replication on submitted job files:
>> >
>> > 1.     Create the file with *default* DFS replication factor (i.e.
>> > 'dfs.replication')
>> >
>> > 2.     Subsequently alter the replication of the file based on the
>> > 'mapred.submit.replication' config value
>> >
>> >   private static FSDataOutputStream createFile(FileSystem fs, Path
>> > splitFile,
>> >       Configuration job)  throws IOException {
>> >     FSDataOutputStream out = FileSystem.create(fs, splitFile,
>> >         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
>> >     int replication = job.getInt("mapred.submit.replication", 10);
>> >     fs.setReplication(splitFile, (short)replication);
>> >     writeSplitHeader(out);
>> >     return out;
>> >   }
>> >
>> > If I understand currectly, the net functional effect of this approach
>>is
>> > that
>> >
>> > -       The initial write pipeline is setup with 'dfs.replication'
>>nodes
>> > (i.e. 3)
>> >
>> > -       The namenode triggers additional inter-datanode replications
>>in
>> > the background (as it detects the blocks as "under-replicated").
>> >
>> > I'm assuming this is intentional?  Alternatively, if the
>> > mapred.submit.replication was specified on initial create, the write
>> > pipeline would be significantly larger.
>> >
>> > The reason I noticed is that we had inadvertently specified
>> > mapred.submit.replication as *less than* dfs.replication in our
>> > configuration, which caused a bunch of excess replica pruning (and
>> > ultimately IOExceptions in our datanode logs).
>> >
>> > Thanks,
>> > Eric
>> >
>> >
>>
>> --
>> CONFIDENTIALITY NOTICE
>> NOTICE: This message is intended for the use of the individual or
>>entity to
>> which it is addressed and may contain information that is confidential,
>> privileged and exempt from disclosure under applicable law. If the
>>reader
>> of this message is not the intended recipient, you are hereby notified
>>that
>> any printing, copying, dissemination, distribution, disclosure or
>> forwarding of this communication is strictly prohibited. If you have
>> received this communication in error, please contact the sender
>>immediately
>> and delete it from your system. Thank You.
>>
>
>
>
>-- 
>Jay Vyas
>http://jayunit100.blogspot.com


Re: mapred replication

Posted by Jay Vyas <ja...@gmail.com>.
Why should this lead to an IOException?  Is it because the pruning of
replicas is asynchronous and the datanodes try to access nonexistent
files?  If so that seems like a pretty major bug


On Fri, Aug 16, 2013 at 5:21 PM, Chris Nauroth <cn...@hortonworks.com>wrote:

> Hi Eric,
>
> Yes, this is intentional.  The job.xml file and the job jar file get read
> from every node running a map or reduce task.  Because of this, using a
> higher than normal replication factor on these files improves locality.
>  More than 3 task slots will have access to local replicas.  These files
> tend to be much smaller than the actual data read by a job, so there tends
> to be little harm done in terms of disk space consumption.
>
> Why not create the file initially with 10 replicas instead of creating it
> with 3 and then dialing up?  I imagine this is so that job submission
> doesn't block on a synchronous write to a long pipeline.  The extra
> replicas aren't necessary for correctness, and a long-running job will get
> the locality benefits in the long term once more replicas are created in
> the background.
>
> I recommend submitting a new jira describing the problem that you saw.  We
> probably can handle this better, and a jira would be a good place to
> discuss the trade-offs.  A few possibilities:
>
> Log a warning if mapred.submit.replication < dfs.replication.
> Skip resetting replication if mapred.submit.replication <= dfs.replication.
> Fail with error if mapred.submit.replication < dfs.replication.
>
> Chris Nauroth
> Hortonworks
> http://hortonworks.com/
>
>
>
> On Thu, Aug 15, 2013 at 6:21 AM, Sirianni, Eric <Eric.Sirianni@netapp.com
> >wrote:
>
> > In debugging some replication issues in our HDFS environment, I noticed
> > that the MapReduce framework uses the following algorithm for setting the
> > replication on submitted job files:
> >
> > 1.     Create the file with *default* DFS replication factor (i.e.
> > 'dfs.replication')
> >
> > 2.     Subsequently alter the replication of the file based on the
> > 'mapred.submit.replication' config value
> >
> >   private static FSDataOutputStream createFile(FileSystem fs, Path
> > splitFile,
> >       Configuration job)  throws IOException {
> >     FSDataOutputStream out = FileSystem.create(fs, splitFile,
> >         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
> >     int replication = job.getInt("mapred.submit.replication", 10);
> >     fs.setReplication(splitFile, (short)replication);
> >     writeSplitHeader(out);
> >     return out;
> >   }
> >
> > If I understand currectly, the net functional effect of this approach is
> > that
> >
> > -       The initial write pipeline is setup with 'dfs.replication' nodes
> > (i.e. 3)
> >
> > -       The namenode triggers additional inter-datanode replications in
> > the background (as it detects the blocks as "under-replicated").
> >
> > I'm assuming this is intentional?  Alternatively, if the
> > mapred.submit.replication was specified on initial create, the write
> > pipeline would be significantly larger.
> >
> > The reason I noticed is that we had inadvertently specified
> > mapred.submit.replication as *less than* dfs.replication in our
> > configuration, which caused a bunch of excess replica pruning (and
> > ultimately IOExceptions in our datanode logs).
> >
> > Thanks,
> > Eric
> >
> >
>
> --
> CONFIDENTIALITY NOTICE
> NOTICE: This message is intended for the use of the individual or entity to
> which it is addressed and may contain information that is confidential,
> privileged and exempt from disclosure under applicable law. If the reader
> of this message is not the intended recipient, you are hereby notified that
> any printing, copying, dissemination, distribution, disclosure or
> forwarding of this communication is strictly prohibited. If you have
> received this communication in error, please contact the sender immediately
> and delete it from your system. Thank You.
>



-- 
Jay Vyas
http://jayunit100.blogspot.com

Re: mapred replication

Posted by Chris Nauroth <cn...@hortonworks.com>.
Hi Eric,

Yes, this is intentional.  The job.xml file and the job jar file get read
from every node running a map or reduce task.  Because of this, using a
higher than normal replication factor on these files improves locality.
 More than 3 task slots will have access to local replicas.  These files
tend to be much smaller than the actual data read by a job, so there tends
to be little harm done in terms of disk space consumption.

Why not create the file initially with 10 replicas instead of creating it
with 3 and then dialing up?  I imagine this is so that job submission
doesn't block on a synchronous write to a long pipeline.  The extra
replicas aren't necessary for correctness, and a long-running job will get
the locality benefits in the long term once more replicas are created in
the background.

I recommend submitting a new jira describing the problem that you saw.  We
probably can handle this better, and a jira would be a good place to
discuss the trade-offs.  A few possibilities:

Log a warning if mapred.submit.replication < dfs.replication.
Skip resetting replication if mapred.submit.replication <= dfs.replication.
Fail with error if mapred.submit.replication < dfs.replication.

Chris Nauroth
Hortonworks
http://hortonworks.com/



On Thu, Aug 15, 2013 at 6:21 AM, Sirianni, Eric <Er...@netapp.com>wrote:

> In debugging some replication issues in our HDFS environment, I noticed
> that the MapReduce framework uses the following algorithm for setting the
> replication on submitted job files:
>
> 1.     Create the file with *default* DFS replication factor (i.e.
> 'dfs.replication')
>
> 2.     Subsequently alter the replication of the file based on the
> 'mapred.submit.replication' config value
>
>   private static FSDataOutputStream createFile(FileSystem fs, Path
> splitFile,
>       Configuration job)  throws IOException {
>     FSDataOutputStream out = FileSystem.create(fs, splitFile,
>         new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION));
>     int replication = job.getInt("mapred.submit.replication", 10);
>     fs.setReplication(splitFile, (short)replication);
>     writeSplitHeader(out);
>     return out;
>   }
>
> If I understand currectly, the net functional effect of this approach is
> that
>
> -       The initial write pipeline is setup with 'dfs.replication' nodes
> (i.e. 3)
>
> -       The namenode triggers additional inter-datanode replications in
> the background (as it detects the blocks as "under-replicated").
>
> I'm assuming this is intentional?  Alternatively, if the
> mapred.submit.replication was specified on initial create, the write
> pipeline would be significantly larger.
>
> The reason I noticed is that we had inadvertently specified
> mapred.submit.replication as *less than* dfs.replication in our
> configuration, which caused a bunch of excess replica pruning (and
> ultimately IOExceptions in our datanode logs).
>
> Thanks,
> Eric
>
>

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.