You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by Aviad sela <se...@gmail.com> on 2009/09/09 18:06:16 UTC

MultipleTextOutputFormat splitting output into different directories.

I am using Hadoop 0.19.1

I attempt to split an input into multiple directories.
I don't know in advance how many directories exists.
I don't know in advance what is the directory depth.
I expect that under each such directory a file exists with all availble
records having the same key permutation
found in the job.

If currently each reducer produce a single output i.e. PART-0001
I would like to create as many directory necessary taking the following
pattern:

               key1 / key2/ .../ keyN/ PART-0001

where the  "key?"  may have different values for each input record.
different record may results with a different path requested:
              key1a/key2b/PART-0001
              key1c/key2d/key3e/PART-0001
to keep it simple, during each job we may expect the same depth from each
record.

I assume that the input records imply that each reduce will produce several
hundreds of such directories.
(Indeed this strongly depends on the input record semantic).


The MAP part reads a record,following some logic, assign a key like :
KEY_A, KEY_B
The MAP Value is the original input line.


For The reducer part I assign the IdentityReducer.
However have set :

    jobConf.setReducerClass(IdentityReducer.*class*);

    jobConf.setOutputFormat(MyTextOutputFormat.*class*);



Where the MyTextOutputFormat  extends MultipleTextOutputFormat, and
implements:

    protected String generateFileNameForKeyValue(K key, V value, String
name)
    {
        String keyParts[] = key.toString().split(",");
        Path finalPath = null;
        // Build the directory structure comprised of the Key parts
       for (int i=0; i < keyParts.length; i++)
       {
            String part = keyParts[i].trim();
           if  (false == "".equals(part))
           {
               if (null == finalPath)
                          finalPath = new Path(part);
                else
                {
                        finalPath = new Path(finalPath, part);
                }
           }
        } //end of for

       String fileName = generateLeafFileName(name);
       finalPath = new Path(finalPath, fileName);

       return finalPath.toString();
 } //generatedFileNameKeyValue
During execution I have seen the reduce attempts does create the following
path under the output path:

"/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000000_0/KEY_A/KEY_B/part-00000"
   However, the file was empty.



The job fails at the end with the the following exceptions found in the task
log:

2009-09-09 11:19:49,653 INFO org.apache.hadoop.hdfs.DFSClient: Exception in
createBlockOutputStream java.io.IOException: Bad connect ack with
firstBadLink 9.148.30.71:50010
2009-09-09 11:19:49,654 INFO org.apache.hadoop.hdfs.DFSClient: Abandoning
block blk_-6138647338595590910_39383
2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: DataStreamer
Exception: java.io.IOException: Unable to create new block.
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2722)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)

2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: Error
Recovery for block blk_-6138647338595590910_39383 bad datanode[1] nodes ==
null
2009-09-09 11:19:55,660 WARN org.apache.hadoop.hdfs.DFSClient: Could not get
block locations. Source file
"/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000002_0/KEY_A/KEY_B/part-00002"
- Aborting...
2009-09-09 11:19:55,686 WARN org.apache.hadoop.mapred.TaskTracker: Error
running child
java.io.IOException: Bad connect ack with firstBadLink 9.148.30.71:50010
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
2009-09-09 11:19:55,688 INFO org.apache.hadoop.mapred.TaskRunner: Runnning
cleanup for the task


The Command line also writes:

09/09/09 11:24:06 INFO mapred.JobClient: Task Id :
attempt_200909080349_0013_r_000003_2, Status : FAILED
java.io.IOException: Bad connect ack with firstBadLink 9.148.30.80:50010
        at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
        at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
        at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
        at
org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)



Any Ideas how to support such a scenario

Re: MultipleTextOutputFormat splitting output into different directories.

Posted by Aviad sela <se...@gmail.com>.
Indeed the data node exception relates to too many files opened, however of
different type from the one you have mentioned. I have appended the error at
the bottom.

Since I am not familiar with the code, the underlying architecture and
design decision the following discussion is just my thoughts and I am not
trying to make any statement :-))

As listed in my first mail, every second reducer reports error which relates
to a socket problem. The other reducer reports the same source of
error emitting from the stack trace any relation to socket. Becaus of this
difference I think that some of the reducers works with local hdfs and
other with remote instance.

I have seen the system reaches 100% map and 100% reduce, for which all files
are created under the _temporary directory (under the target user
directory). I assume that each reducer writes to its local hdfs,and that
failure occur during finalization of the process, when all data is moved
into the target user directory. I guess that this finalization should be
"atomic" inorder to make sure that a reducer's "attempt" is finalized. The
current common usage of reducer output, in which there is a single file
output, make the "atomic" finalization simple

Still, I am puzzled why this should cause failure, because I expect the DFS
engine to rename of the _temporary files into their final name, without
copying them over the network. In any case, since any resources is limited,
the DFS engine might need to take this into account. Once such solution
would be to iterate over all waiting temporary files sharing the availble
amount of sockets.

Regarding your remark of using another approach:
This is always a possiblity , yet we believe that by splitting the data
during reduce into different files, may display significant advantages
during successive processing. Just consider what is required to retrieve a
single key - deep scan through all files.


IOException:

WARN org.apache.hadoop.hdfs.server.datanode.DataNode: java.io.IOException:
Cannot run program "bash": java.io.IOException: error=24, Too many open
files at java.lang.Throwable.(Throwable.java:80) at
java.lang.ProcessBuilder.start(ProcessBuilder.java:449) at
org.apache.hadoop.util.Shell.runCommand(Shell.java:149) at
org.apache.hadoop.util.Shell.run(Shell.java:134) at
org.apache.hadoop.fs.DF.getCapacity(DF.java:63) at
org.apache.hadoop.hdfs.server.datanode.FSDataset$FSVolume.getCapacity(FSDataset.java:338)
at
org.apache.hadoop.hdfs.server.datanode.FSDataset$FSVolumeSet.getCapacity(FSDataset.java:503)
at
org.apache.hadoop.hdfs.server.datanode.FSDataset.getCapacity(FSDataset.java:700)
at
org.apache.hadoop.hdfs.server.datanode.DataNode.offerService(DataNode.java:680)
at org.apache.hadoop.hdfs.server.datanode.DataNode.run(DataNode.java:1141)
at java.lang.Thread.run(Thread.java:735) Caused by: java.io.IOException:
java.io.IOException: error=24, Too many open files at
java.lang.Throwable.(Throwable.java:67) at java.lang.UNIXProcess




On Tue, Sep 15, 2009 at 2:35 PM, Jingkei Ly <jl...@googlemail.com> wrote:

> I think I had similar sorts of errors trying to write via 100s of open
> OutputStreams to HDFS - I presume that MultipleOutputFormat has to do the
> same at each Reducer if you give it many output files to create, as in your
> use case.
> I think the problem stems from having many concurrent sockets open. You can
> mitigate this problem by adding dfs.datanode.max.xcievers to
> hadoop-site.xml
> and setting a higher value (if you check the DataNode logs you might find
> these errors confirming it: java.io.IOException: xceiverCount 257 exceeds
> the limit of concurrent xcievers 256). However, I think this setting
> carries
> a memory overhead so you can't keep increasing the value indefinitely - is
> there another way you can approach this without needing so many output
> files
> per reducer?
>
> 2009/9/15 Aviad sela <se...@gmail.com>
>
> > Is any body interested ,addressed such probelm.
> > Or does it seem to be esoteric usage ?
> >
> >
> >
> >
> > On Wed, Sep 9, 2009 at 7:06 PM, Aviad sela <se...@gmail.com> wrote:
> >
> > >  I am using Hadoop 0.19.1
> > >
> > > I attempt to split an input into multiple directories.
> > > I don't know in advance how many directories exists.
> > > I don't know in advance what is the directory depth.
> > > I expect that under each such directory a file exists with all availble
> > > records having the same key permutation
> > > found in the job.
> > >
> > > If currently each reducer produce a single output i.e. PART-0001
> > > I would like to create as many directory necessary taking the following
> > > pattern:
> > >
> > >                key1 / key2/ .../ keyN/ PART-0001
> > >
> > > where the  "key?"  may have different values for each input record.
> > > different record may results with a different path requested:
> > >               key1a/key2b/PART-0001
> > >               key1c/key2d/key3e/PART-0001
> > > to keep it simple, during each job we may expect the same depth from
> each
> > > record.
> > >
> > > I assume that the input records imply that each reduce will produce
> > several
> > > hundreds of such directories.
> > > (Indeed this strongly depends on the input record semantic).
> > >
> > >
> > > The MAP part reads a record,following some logic, assign a key like :
> > > KEY_A, KEY_B
> > > The MAP Value is the original input line.
> > >
> > >
> > > For The reducer part I assign the IdentityReducer.
> > > However have set :
> > >
> > >     jobConf.setReducerClass(IdentityReducer.
> > > *class*);
> > >
> > >     jobConf.setOutputFormat(MyTextOutputFormat.*class*);
> > >
> > >
> > >
> > > Where the MyTextOutputFormat  extends MultipleTextOutputFormat, and
> > > implements:
> > >
> > >     protected String generateFileNameForKeyValue(K key, V value, String
> > > name)
> > >     {
> > >         String keyParts[] = key.toString().split(",");
> > >         Path finalPath = null;
> > >         // Build the directory structure comprised of the Key parts
> > >        for (int i=0; i < keyParts.length; i++)
> > >        {
> > >             String part = keyParts[i].trim();
> > >            if  (false == "".equals(part))
> > >            {
> > >                if (null == finalPath)
> > >                           finalPath = new Path(part);
> > >                 else
> > >                 {
> > >                         finalPath = new Path(finalPath, part);
> > >                 }
> > >            }
> > >         } //end of for
> > >
> > >        String fileName = generateLeafFileName(name);
> > >        finalPath = new Path(finalPath, fileName);
> > >
> > >        return finalPath.toString();
> > >  } //generatedFileNameKeyValue
> > > During execution I have seen the reduce attempts does create the
> > following
> > > path under the output path:
> > >
> > >
> >
> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000000_0/KEY_A/KEY_B/part-00000"
> > >    However, the file was empty.
> > >
> > >
> > >
> > > The job fails at the end with the the following exceptions found in the
> > > task log:
> > >
> > > 2009-09-09 11:19:49,653 INFO org.apache.hadoop.hdfs.DFSClient:
> Exception
> > in
> > > createBlockOutputStream java.io.IOException: Bad connect ack with
> > > firstBadLink 9.148.30.71:50010
> > > 2009-09-09 11:19:49,654 INFO org.apache.hadoop.hdfs.DFSClient:
> Abandoning
> > > block blk_-6138647338595590910_39383
> > > 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient:
> > DataStreamer
> > > Exception: java.io.IOException: Unable to create new block.
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2722)
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
> > >
> > > 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: Error
> > > Recovery for block blk_-6138647338595590910_39383 bad datanode[1] nodes
> > ==
> > > null
> > > 2009-09-09 11:19:55,660 WARN org.apache.hadoop.hdfs.DFSClient: Could
> not
> > > get block locations. Source file
> > >
> >
> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000002_0/KEY_A/KEY_B/part-00002"
> > > - Aborting...
> > > 2009-09-09 11:19:55,686 WARN org.apache.hadoop.mapred.TaskTracker:
> Error
> > > running child
> > > java.io.IOException: Bad connect ack with firstBadLink
> 9.148.30.71:50010
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> > > at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
> > > 2009-09-09 11:19:55,688 INFO org.apache.hadoop.mapred.TaskRunner:
> > Runnning
> > > cleanup for the task
> > >
> > >
> > > The Command line also writes:
> > >
> > > 09/09/09 11:24:06 INFO mapred.JobClient: Task Id :
> > > attempt_200909080349_0013_r_000003_2, Status : FAILED
> > > java.io.IOException: Bad connect ack with firstBadLink
> 9.148.30.80:50010
> > >         at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
> > >         at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
> > >         at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> > >         at
> > >
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
> > >
> > >
> > >
> > > Any Ideas how to support such a scenario
> > >
> >
>

Re: MultipleTextOutputFormat splitting output into different directories.

Posted by Jingkei Ly <jl...@googlemail.com>.
I think I had similar sorts of errors trying to write via 100s of open
OutputStreams to HDFS - I presume that MultipleOutputFormat has to do the
same at each Reducer if you give it many output files to create, as in your
use case.
I think the problem stems from having many concurrent sockets open. You can
mitigate this problem by adding dfs.datanode.max.xcievers to hadoop-site.xml
and setting a higher value (if you check the DataNode logs you might find
these errors confirming it: java.io.IOException: xceiverCount 257 exceeds
the limit of concurrent xcievers 256). However, I think this setting carries
a memory overhead so you can't keep increasing the value indefinitely - is
there another way you can approach this without needing so many output files
per reducer?

2009/9/15 Aviad sela <se...@gmail.com>

> Is any body interested ,addressed such probelm.
> Or does it seem to be esoteric usage ?
>
>
>
>
> On Wed, Sep 9, 2009 at 7:06 PM, Aviad sela <se...@gmail.com> wrote:
>
> >  I am using Hadoop 0.19.1
> >
> > I attempt to split an input into multiple directories.
> > I don't know in advance how many directories exists.
> > I don't know in advance what is the directory depth.
> > I expect that under each such directory a file exists with all availble
> > records having the same key permutation
> > found in the job.
> >
> > If currently each reducer produce a single output i.e. PART-0001
> > I would like to create as many directory necessary taking the following
> > pattern:
> >
> >                key1 / key2/ .../ keyN/ PART-0001
> >
> > where the  "key?"  may have different values for each input record.
> > different record may results with a different path requested:
> >               key1a/key2b/PART-0001
> >               key1c/key2d/key3e/PART-0001
> > to keep it simple, during each job we may expect the same depth from each
> > record.
> >
> > I assume that the input records imply that each reduce will produce
> several
> > hundreds of such directories.
> > (Indeed this strongly depends on the input record semantic).
> >
> >
> > The MAP part reads a record,following some logic, assign a key like :
> > KEY_A, KEY_B
> > The MAP Value is the original input line.
> >
> >
> > For The reducer part I assign the IdentityReducer.
> > However have set :
> >
> >     jobConf.setReducerClass(IdentityReducer.
> > *class*);
> >
> >     jobConf.setOutputFormat(MyTextOutputFormat.*class*);
> >
> >
> >
> > Where the MyTextOutputFormat  extends MultipleTextOutputFormat, and
> > implements:
> >
> >     protected String generateFileNameForKeyValue(K key, V value, String
> > name)
> >     {
> >         String keyParts[] = key.toString().split(",");
> >         Path finalPath = null;
> >         // Build the directory structure comprised of the Key parts
> >        for (int i=0; i < keyParts.length; i++)
> >        {
> >             String part = keyParts[i].trim();
> >            if  (false == "".equals(part))
> >            {
> >                if (null == finalPath)
> >                           finalPath = new Path(part);
> >                 else
> >                 {
> >                         finalPath = new Path(finalPath, part);
> >                 }
> >            }
> >         } //end of for
> >
> >        String fileName = generateLeafFileName(name);
> >        finalPath = new Path(finalPath, fileName);
> >
> >        return finalPath.toString();
> >  } //generatedFileNameKeyValue
> > During execution I have seen the reduce attempts does create the
> following
> > path under the output path:
> >
> >
> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000000_0/KEY_A/KEY_B/part-00000"
> >    However, the file was empty.
> >
> >
> >
> > The job fails at the end with the the following exceptions found in the
> > task log:
> >
> > 2009-09-09 11:19:49,653 INFO org.apache.hadoop.hdfs.DFSClient: Exception
> in
> > createBlockOutputStream java.io.IOException: Bad connect ack with
> > firstBadLink 9.148.30.71:50010
> > 2009-09-09 11:19:49,654 INFO org.apache.hadoop.hdfs.DFSClient: Abandoning
> > block blk_-6138647338595590910_39383
> > 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient:
> DataStreamer
> > Exception: java.io.IOException: Unable to create new block.
> > at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2722)
> > at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> > at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
> >
> > 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: Error
> > Recovery for block blk_-6138647338595590910_39383 bad datanode[1] nodes
> ==
> > null
> > 2009-09-09 11:19:55,660 WARN org.apache.hadoop.hdfs.DFSClient: Could not
> > get block locations. Source file
> >
> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000002_0/KEY_A/KEY_B/part-00002"
> > - Aborting...
> > 2009-09-09 11:19:55,686 WARN org.apache.hadoop.mapred.TaskTracker: Error
> > running child
> > java.io.IOException: Bad connect ack with firstBadLink 9.148.30.71:50010
> > at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
> > at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
> > at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> > at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
> > 2009-09-09 11:19:55,688 INFO org.apache.hadoop.mapred.TaskRunner:
> Runnning
> > cleanup for the task
> >
> >
> > The Command line also writes:
> >
> > 09/09/09 11:24:06 INFO mapred.JobClient: Task Id :
> > attempt_200909080349_0013_r_000003_2, Status : FAILED
> > java.io.IOException: Bad connect ack with firstBadLink 9.148.30.80:50010
> >         at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
> >         at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
> >         at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> >         at
> >
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
> >
> >
> >
> > Any Ideas how to support such a scenario
> >
>

Re: MultipleTextOutputFormat splitting output into different directories.

Posted by Alejandro Abdelnur <tu...@gmail.com>.
Using the MultipleOutputs (
http://hadoop.apache.org/common/docs/r0.19.0/api/org/apache/hadoop/mapred/lib/MultipleOutputs.html
) you can split data in different files in the outputdir.

After your job finishes you can move the files to different directories.

The benefit of this doing this is that task failures and speculative
execution will also take into account all these files and your data
will be consistent. If you are writing to different directories then
you have to handle this by hand.

A

On Tue, Sep 15, 2009 at 4:22 PM, Aviad sela <se...@gmail.com> wrote:
> Is any body interested ,addressed such probelm.
> Or does it seem to be esoteric usage ?
>
>
>
>
> On Wed, Sep 9, 2009 at 7:06 PM, Aviad sela <se...@gmail.com> wrote:
>
>>  I am using Hadoop 0.19.1
>>
>> I attempt to split an input into multiple directories.
>> I don't know in advance how many directories exists.
>> I don't know in advance what is the directory depth.
>> I expect that under each such directory a file exists with all availble
>> records having the same key permutation
>> found in the job.
>>
>> If currently each reducer produce a single output i.e. PART-0001
>> I would like to create as many directory necessary taking the following
>> pattern:
>>
>>                key1 / key2/ .../ keyN/ PART-0001
>>
>> where the  "key?"  may have different values for each input record.
>> different record may results with a different path requested:
>>               key1a/key2b/PART-0001
>>               key1c/key2d/key3e/PART-0001
>> to keep it simple, during each job we may expect the same depth from each
>> record.
>>
>> I assume that the input records imply that each reduce will produce several
>> hundreds of such directories.
>> (Indeed this strongly depends on the input record semantic).
>>
>>
>> The MAP part reads a record,following some logic, assign a key like :
>> KEY_A, KEY_B
>> The MAP Value is the original input line.
>>
>>
>> For The reducer part I assign the IdentityReducer.
>> However have set :
>>
>>     jobConf.setReducerClass(IdentityReducer.
>> *class*);
>>
>>     jobConf.setOutputFormat(MyTextOutputFormat.*class*);
>>
>>
>>
>> Where the MyTextOutputFormat  extends MultipleTextOutputFormat, and
>> implements:
>>
>>     protected String generateFileNameForKeyValue(K key, V value, String
>> name)
>>     {
>>         String keyParts[] = key.toString().split(",");
>>         Path finalPath = null;
>>         // Build the directory structure comprised of the Key parts
>>        for (int i=0; i < keyParts.length; i++)
>>        {
>>             String part = keyParts[i].trim();
>>            if  (false == "".equals(part))
>>            {
>>                if (null == finalPath)
>>                           finalPath = new Path(part);
>>                 else
>>                 {
>>                         finalPath = new Path(finalPath, part);
>>                 }
>>            }
>>         } //end of for
>>
>>        String fileName = generateLeafFileName(name);
>>        finalPath = new Path(finalPath, fileName);
>>
>>        return finalPath.toString();
>>  } //generatedFileNameKeyValue
>> During execution I have seen the reduce attempts does create the following
>> path under the output path:
>>
>> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000000_0/KEY_A/KEY_B/part-00000"
>>    However, the file was empty.
>>
>>
>>
>> The job fails at the end with the the following exceptions found in the
>> task log:
>>
>> 2009-09-09 11:19:49,653 INFO org.apache.hadoop.hdfs.DFSClient: Exception in
>> createBlockOutputStream java.io.IOException: Bad connect ack with
>> firstBadLink 9.148.30.71:50010
>> 2009-09-09 11:19:49,654 INFO org.apache.hadoop.hdfs.DFSClient: Abandoning
>> block blk_-6138647338595590910_39383
>> 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: DataStreamer
>> Exception: java.io.IOException: Unable to create new block.
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2722)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
>>
>> 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: Error
>> Recovery for block blk_-6138647338595590910_39383 bad datanode[1] nodes ==
>> null
>> 2009-09-09 11:19:55,660 WARN org.apache.hadoop.hdfs.DFSClient: Could not
>> get block locations. Source file
>> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000002_0/KEY_A/KEY_B/part-00002"
>> - Aborting...
>> 2009-09-09 11:19:55,686 WARN org.apache.hadoop.mapred.TaskTracker: Error
>> running child
>> java.io.IOException: Bad connect ack with firstBadLink 9.148.30.71:50010
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
>> at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
>> 2009-09-09 11:19:55,688 INFO org.apache.hadoop.mapred.TaskRunner: Runnning
>> cleanup for the task
>>
>>
>> The Command line also writes:
>>
>> 09/09/09 11:24:06 INFO mapred.JobClient: Task Id :
>> attempt_200909080349_0013_r_000003_2, Status : FAILED
>> java.io.IOException: Bad connect ack with firstBadLink 9.148.30.80:50010
>>         at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
>>         at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
>>         at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
>>         at
>> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
>>
>>
>>
>> Any Ideas how to support such a scenario
>>
>

Re: MultipleTextOutputFormat splitting output into different directories.

Posted by Aviad sela <se...@gmail.com>.
Is any body interested ,addressed such probelm.
Or does it seem to be esoteric usage ?




On Wed, Sep 9, 2009 at 7:06 PM, Aviad sela <se...@gmail.com> wrote:

>  I am using Hadoop 0.19.1
>
> I attempt to split an input into multiple directories.
> I don't know in advance how many directories exists.
> I don't know in advance what is the directory depth.
> I expect that under each such directory a file exists with all availble
> records having the same key permutation
> found in the job.
>
> If currently each reducer produce a single output i.e. PART-0001
> I would like to create as many directory necessary taking the following
> pattern:
>
>                key1 / key2/ .../ keyN/ PART-0001
>
> where the  "key?"  may have different values for each input record.
> different record may results with a different path requested:
>               key1a/key2b/PART-0001
>               key1c/key2d/key3e/PART-0001
> to keep it simple, during each job we may expect the same depth from each
> record.
>
> I assume that the input records imply that each reduce will produce several
> hundreds of such directories.
> (Indeed this strongly depends on the input record semantic).
>
>
> The MAP part reads a record,following some logic, assign a key like :
> KEY_A, KEY_B
> The MAP Value is the original input line.
>
>
> For The reducer part I assign the IdentityReducer.
> However have set :
>
>     jobConf.setReducerClass(IdentityReducer.
> *class*);
>
>     jobConf.setOutputFormat(MyTextOutputFormat.*class*);
>
>
>
> Where the MyTextOutputFormat  extends MultipleTextOutputFormat, and
> implements:
>
>     protected String generateFileNameForKeyValue(K key, V value, String
> name)
>     {
>         String keyParts[] = key.toString().split(",");
>         Path finalPath = null;
>         // Build the directory structure comprised of the Key parts
>        for (int i=0; i < keyParts.length; i++)
>        {
>             String part = keyParts[i].trim();
>            if  (false == "".equals(part))
>            {
>                if (null == finalPath)
>                           finalPath = new Path(part);
>                 else
>                 {
>                         finalPath = new Path(finalPath, part);
>                 }
>            }
>         } //end of for
>
>        String fileName = generateLeafFileName(name);
>        finalPath = new Path(finalPath, fileName);
>
>        return finalPath.toString();
>  } //generatedFileNameKeyValue
> During execution I have seen the reduce attempts does create the following
> path under the output path:
>
> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000000_0/KEY_A/KEY_B/part-00000"
>    However, the file was empty.
>
>
>
> The job fails at the end with the the following exceptions found in the
> task log:
>
> 2009-09-09 11:19:49,653 INFO org.apache.hadoop.hdfs.DFSClient: Exception in
> createBlockOutputStream java.io.IOException: Bad connect ack with
> firstBadLink 9.148.30.71:50010
> 2009-09-09 11:19:49,654 INFO org.apache.hadoop.hdfs.DFSClient: Abandoning
> block blk_-6138647338595590910_39383
> 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: DataStreamer
> Exception: java.io.IOException: Unable to create new block.
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2722)
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
>
> 2009-09-09 11:19:55,659 WARN org.apache.hadoop.hdfs.DFSClient: Error
> Recovery for block blk_-6138647338595590910_39383 bad datanode[1] nodes ==
> null
> 2009-09-09 11:19:55,660 WARN org.apache.hadoop.hdfs.DFSClient: Could not
> get block locations. Source file
> "/user/hadoop/test_rep01/_temporary/_attempt_200909080349_0013_r_000002_0/KEY_A/KEY_B/part-00002"
> - Aborting...
> 2009-09-09 11:19:55,686 WARN org.apache.hadoop.mapred.TaskTracker: Error
> running child
> java.io.IOException: Bad connect ack with firstBadLink 9.148.30.71:50010
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
> at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
> 2009-09-09 11:19:55,688 INFO org.apache.hadoop.mapred.TaskRunner: Runnning
> cleanup for the task
>
>
> The Command line also writes:
>
> 09/09/09 11:24:06 INFO mapred.JobClient: Task Id :
> attempt_200909080349_0013_r_000003_2, Status : FAILED
> java.io.IOException: Bad connect ack with firstBadLink 9.148.30.80:50010
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.createBlockOutputStream(DFSClient.java:2780)
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.nextBlockOutputStream(DFSClient.java:2703)
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.access$2000(DFSClient.java:1996)
>         at
> org.apache.hadoop.hdfs.DFSClient$DFSOutputStream$DataStreamer.run(DFSClient.java:2183)
>
>
>
> Any Ideas how to support such a scenario
>