You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by amitsingh <am...@cse.iitb.ac.in> on 2008/12/10 20:12:34 UTC

File Splits in Hadoop

Hi,

I am stuck with some questions based on following scenario.

1) Hadoop normally splits the input file and distributes the splits 
across slaves(referred to as Psplits from now), in to chunks of 64 MB.
a) Is there Any way to specify split criteria  so for example a huge 4 
GB file is split in to 40 odd files(Psplits) respecting record boundaries ?
b) Is it even required that these physical splits(Psplits) obey record 
boundaries ?

2) We can get locations of these Psplits on HDFS as follows
BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,  
length); //FileInputFormat line 273
In FileInputFormat, for each blkLocations(Psplit) multiple logical 
splits(referred to as Lsplits from now) are created based on hueristic 
for number of mappers.

Q) How is following situation handled in TextInputFormat which reads 
line by line,
    i) Input File is split as described in step 1 in more than 2 parts
    ii) Suppose there is a line of text which starts near end of 
Psplit-i and end in Psplit-i+1 (say Psplit2 and Psplit3)
    iii) Which mapper gets this line spanning multiple Psplits(mapper_i 
or mapper_i+1)
    iv) I went through the FileInputFormat code, Lsplits are done only 
for a particular pSplit not across pSplit. Why so ?

Q) In short, If one has to read arbitary objects(not line), how does one 
handle records which are partially in one PSplit and partially in other.

--Amit




Re: Re: Re: File Splits in Hadoop

Posted by amitsingh <am...@cse.iitb.ac.in>.
Hi ,

After some debugging it seems that i got the problem.

In parseARC(...)  length for bzip was incorrectly set. (last bzip in the 
split)
if (totalRead > splitEnd) {
               break;
} ... it should have ideally allowed to read from next block instead of 
setting length to offset-size for last block.

so modified the next to directly read from inputStream instead of byte 
array  populated using offset & length.


anyways, Is this the correct way to read records spanning multiple splits.
(Currently Its taking quite some time to do processing 100% CPU :(.  
Investigating what could be the potential cause )

So for me the conclusion seems to be reads spanning multiple physical 
blocks are transparent.

***I Guess, could have handled this in parseARC in a cleaner way****

 


*********************************************************************************************
modified srcCode
*********************************************************************************************

public boolean next(Text key, BytesWritable value) throws IOException {
        long start = 0;
        long len = 0;
        try {
            LOG.info("NEXT !!!!!!!!!!!!!! ");
            int index = recordIndex++;
            if (index >= startLens.length) {
                LOG.info("BAD ");
                return false;
            } else {
                LOG.info("GOOD");
            }

            start = startLens[index][0];
            len = startLens[index][1];
            byte[] zipbytes = new byte[(int) len];

            LOG.info("index" + index + "\tstartLens.length" + 
startLens.length +"\tstart:" + start + "\tlen" + len);

           
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes);
            GZIPInputStream zin = null;
            in.seek(start);                                            
                //Change
            if (index == startLens.length - 1) {                        
//Change
                LOG.info("zin = new GZIPInputStream(in);");    //Change
                zin = new GZIPInputStream(in);                          
//Change
            } else {                                                    
                //Change
                in.read(zipbytes);                                    
                //Change
                zin = new GZIPInputStream(zipin);                    
//Change
                LOG.info("zin = new GZIPInputStream(zipin);");//Change
            }

            int gzipRead = -1;
            int totalGzipRead = 0;
            baos.reset();
            try {
                while ((gzipRead = zin.read(buffer, 0, buffer.length)) 
!= -1) {
                    baos.write(buffer, 0, gzipRead);
                    totalGzipRead += gzipRead;
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                LOG
                        .info(ex.toString() + "\nBANGstart:" + start + 
"\tlen"
                                + len);
                LOG.equals(StringUtils.stringifyException(ex));
            }

            byte[] pageBytes = baos.toByteArray();
            baos.close();
            if (index != startLens.length - 1) {                    //Change
                zin.close();                                            
           //Change
            }                                                            
               //Change
            zipin.close();
          
            Text keyText = (Text) key;
            keyText.set("" + index);
            BytesWritable valueBytes = (BytesWritable) value;
            valueBytes.set(pageBytes, 0, pageBytes.length);

            return true;
        } catch (Exception e) {
            e.printStackTrace();
            LOG.info(e.toString() + "start:" + start + "\tlen" + len);
            LOG.equals(StringUtils.stringifyException(e));
            return false;
        }

    }


amitsingh wrote:
> Thanks for discussion Taran,
>
> The problem still persists.
> What should be done if i have a record which spans multiple PSplits 
> (physcial splits on HDFS)?
> What happens if  we try to read beyond a pSplit?
> Is the next read transparently done from records corresponding to 
> next  block for the same file (might not be on the same machine) or
> next block (may not be of the same file) from the local disk is read.
>
> If its former i guess things should have worked fine (surprisingly 
> they arent !! i m goofing  it up somewhere).
> If its latter then i have no idea how to tackle this. (Any help would 
> be highly appreciated)
>
>
>
> ************************************************************************************************** 
>
>
> I Tried running a simple program where in I created a sample GZip file 
> by serailizing records
>           // serialize the objects sarah and sam
>           FileOutputStream fos = new 
> FileOutputStream("/home/amitsingh/OUTPUT/out.bin");
>           GZIPOutputStream gz = new GZIPOutputStream(fos);
>           ObjectOutputStream oos = new ObjectOutputStream(gz);
>
>           for (int i = 0; i < 500000; i++) {
>               Employee sam = new Employee(i + "name", i,   i + 50000);
>            // 3 fields , 2 int , 1 string
>               oos.writeObject(sam);
>           }
>           oos.flush();
>           oos.close();
>
> Now if i just run a simple map reduce on this binary file, it gives 
> exception java.io.EOFException: Unexpected end of ZLIB input stream
> It creates 2 splits
> Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0 
> length:1555001 hosts: sandpiper ,bytesRemaining: 1555001
> Split 2:  hdfs://localhost:54310/user/amitsingh/out1: start1555001 
> length:1555001 hosts: sandpiper ,
>
> For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB 
> input stream [for startLens[0]  start:0    len1556480]
> For Map2--> No valid GZip is found as startLens is empty
>
> I am not sure why in Map1 len1556480 and not 3110002(entire file) as  
> there is ONLY one GZip and thats the entire file.
> Any guidance would be of great help ??
>
>
>
>
>
>
>
> ************************************************************************************************************** 
>
> Source code
> ************************************************************************************************************** 
>
>
> package org.apache.hadoop.mapred;
> import java.io.ByteArrayInputStream;
> import java.io.ByteArrayOutputStream;
> import java.io.IOException;
> import java.util.ArrayList;
> import java.util.List;
> import java.util.zip.GZIPInputStream;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.FSDataInputStream;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.util.ReflectionUtils;
> import org.apache.hadoop.util.StringUtils;
>
> public class CustomGzipRecordReader implements
>        RecordReader<Text, BytesWritable> {
>
>    public static final Log LOG = LogFactory
>            .getLog(CustomGzipRecordReader.class);
>
>    protected Configuration conf;
>    protected long splitStart = 0;
>    protected long pos = 0;
>    protected long splitEnd = 0;
>    protected long splitLen = 0;
>    protected long fileLen = 0;
>    protected FSDataInputStream in;
>    protected int recordIndex = 0;
>    protected long[][] startLens;
>    protected byte[] buffer = new byte[4096];
>
>    private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };
>
>    //chech the split and populate startLens indicating at which all 
> offset a Zlib file starts in this split
>    private void parseArcBytes() throws IOException {
>
>        long totalRead = in.getPos();
>        byte[] buffer = new byte[4096];
>        List<Long> starts = new ArrayList<Long>();
>
>        int read = -1;
>        while ((read = in.read(buffer)) > 0) {
>
>            for (int i = 0; i < (read - 1); i++) {
>
>                if ((buffer[i] == (byte) 0x1F)
>                        && (buffer[i + 1] == (byte) 0x8B)) {
>                    long curStart = totalRead + i;
>                    in.seek(curStart);
>                    byte[] zipbytes = null;
>                    try {
>                        zipbytes = new byte[32];
>                        in.read(zipbytes);
>                        ByteArrayInputStream zipin = new 
> ByteArrayInputStream(
>                                zipbytes);
>                        GZIPInputStream zin = new GZIPInputStream(zipin);
>                        zin.close();
>                        zipin.close();
>                        starts.add(curStart);
>                        LOG.info("curStart: " + (curStart));
>                    } catch (Exception e) {
>                        LOG.info("Ignoring position: " + (curStart));
>                        continue;
>                    }
>                }
>            }
>
>            totalRead += read;
>            in.seek(totalRead);
>            if (totalRead > splitEnd) {
>                break;
>            }
>        }
>
>        startLens = new long[starts.size()][2];
>        for (int i = 0; i < starts.size(); i++) {
>
>            long start = starts.get(i);
>            long length = ((i < starts.size() - 1) ? starts.get(i + 1)
>                    : totalRead)
>                    - start;
>            startLens[i][0] = start;
>            startLens[i][1] = length;
>            System.out.println("startLens[" + i + "][0] " + 
> startLens[i][0]
>                    + "\t\t" + startLens[i][1]);
>        }
>    }
>
>    public CustomGzipRecordReader(Configuration conf, FileSplit split)
>            throws IOException {
>
>        Path path = split.getPath();
>        FileSystem fs = path.getFileSystem(conf);
>        fileLen = fs.getFileStatus(split.getPath()).getLen();
>        this.conf = conf;
>        this.in = fs.open(split.getPath());
>        this.splitStart = split.getStart();
>        this.splitEnd = splitStart + split.getLength();
>        this.splitLen = split.getLength();
>        in.seek(splitStart);
>        parseArcBytes();
>
>    }
>
>    public void close() throws IOException {
>        this.in.close();
>    }
>
>    public Text createKey() {
>        return (Text) ReflectionUtils.newInstance(Text.class, conf);
>    }
>
>    public BytesWritable createValue() {
>        return (BytesWritable) 
> ReflectionUtils.newInstance(BytesWritable.class,
>                conf);
>    }
>
>    public long getPos() throws IOException {
>        return 0;
>    }
>
>    public float getProgress() throws IOException {
>
>        if (recordIndex == 0) {
>            return 0.0f;
>        } else {
>            // the progress is current pos - where we started / length 
> of the
>            // split
>            return Math.min(1.0f, (float) (recordIndex / 
> startLens.length));
>        }
>    }
>
>    public boolean next(Text key, BytesWritable value) throws 
> IOException {
>        long start = 0;
>        long len = 0;
>        try {
>            LOG.info("NEXT Called ");
>            int index = recordIndex++;
>            if (index >= startLens.length) {
>                LOG.info("BAD ");
>                return false;
>            } else {
>                LOG.info("GOOD");
>            }
>
>            start = startLens[index][0];
>            len = startLens[index][1];
>            byte[] zipbytes = new byte[(int) len];
>
>            LOG.info("start:" + start + "\tlen" + len);
>
>            in.seek(start);
>            in.read(zipbytes);
>
>            ByteArrayOutputStream baos = new ByteArrayOutputStream();
>            ByteArrayInputStream zipin = new 
> ByteArrayInputStream(zipbytes);
>            GZIPInputStream zin = new GZIPInputStream(zipin);
>
>            int gzipRead = -1;
>            int totalGzipRead = 0;
>            baos.reset();
>            try {
>                while ((gzipRead = zin.read(buffer, 0, buffer.length)) 
> != -1) {  // <--------SOURCE of exception
>                    baos.write(buffer, 0, gzipRead);
>                    totalGzipRead += gzipRead;
>                }
>            } catch (Exception ex) {
>                ex.printStackTrace();
>                LOG
>                        .info(ex.toString() + "\nstart:" + start + 
> "\tlen"                                       + len);
>                LOG.equals(StringUtils.stringifyException(ex));
>            }
>
>            byte[] pageBytes = baos.toByteArray();
>            baos.close();
>            zin.close();
>            zipin.close();
>
> //            GZIPInputStream gs = new GZIPInputStream(new 
> ByteArrayInputStream(
> //                    bytes.get()));
>
>            // ObjectInputStream ois = new ObjectInputStream(zin);
>            // for (int i = 0; i < 500000; i++) {
>            // Employee sarah = null;
>            // try {
>            // sarah = (Employee) ois.readObject();
>            // // LOG.info(sarah.printObject());
>            // } catch (ClassNotFoundException e) {
>            // // TODO Auto-generated catch block
>            // LOG.info(e + "start:" + start + "\tlen" + len);
>            // e.printStackTrace();
>            // }
>            // // sarah.print();
>            //
>            // }
>                       Text keyText = (Text) key;
>             keyText.set(""+index);
>             BytesWritable valueBytes = (BytesWritable) value;
>             valueBytes.set(pageBytes, 0, pageBytes.length);
>
>            return true;
>        } catch (Exception e) {
>            e.printStackTrace();
>            LOG.info(e.toString() + "start:" + start + "\tlen" + len);
>            LOG.equals(StringUtils.stringifyException(e));
>            return false;
>        }
>
>    }
> }
>
> *********************************************************************************** 
>
>
> package org.apache.hadoop.mapred;
> import java.io.IOException;
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.Text;
> /**
> * A input format the reads custom gzip files.
> */
> public class CustomGzipInputFormat extends FileInputFormat<Text, 
> BytesWritable> {
>    public RecordReader<Text, BytesWritable> getRecordReader(InputSplit 
> split,
>            JobConf job, Reporter reporter) throws IOException {
>        reporter.setStatus(split.toString());
>        return new CustomGzipRecordReader(job, (FileSplit) split);
>    }
>
> }
>
> *********************************************************************************** 
>
>
>
> package org.apache.hadoop.examples;
>
> import java.io.ByteArrayInputStream;
> import java.io.IOException;
> import java.io.ObjectInputStream;
> import java.util.ArrayList;
> import java.util.List;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.conf.Configured;
> import org.apache.hadoop.examples.WordCount.Reduce;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.BytesWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.CustomGzipInputFormat;
> import org.apache.hadoop.mapred.Employee;
> import org.apache.hadoop.mapred.FileInputFormat;
> import org.apache.hadoop.mapred.FileOutputFormat;
> import org.apache.hadoop.mapred.JobClient;
> import org.apache.hadoop.mapred.JobConf;
> import org.apache.hadoop.mapred.Mapper;
> import org.apache.hadoop.mapred.OutputCollector;
> import org.apache.hadoop.mapred.Reporter;
> import org.apache.hadoop.mapred.TextOutputFormat;
> import org.apache.hadoop.util.Tool;
> import org.apache.hadoop.util.ToolRunner;
>
> public class CustomObjectReader extends Configured implements Tool,
>        Mapper<Text, BytesWritable, Text, Text> {
>
>    private JobConf jobConf;
>
>    // public static final Log LOG = 
> LogFactory.getLog(ArcSegmentCreator.class);
>
>    public CustomObjectReader() {
>
>    }
>
>    public void configure(JobConf job) {
>        this.jobConf = job;
>    }
>
>    public void close() {
>    }
>
>    public CustomObjectReader(Configuration conf) {
>        setConf(conf);
>    }
>
>    public void map(Text key, BytesWritable bytes,
>            OutputCollector<Text, Text> output, Reporter reporter)
>            throws IOException {
>        ObjectInputStream ois = new ObjectInputStream(new 
> ByteArrayInputStream(
>                bytes.get()));
>        long count =0;
>        for (int i = 0; i < 500000; i++) {
>            Employee sarah = null;
>                      try {
>                sarah = (Employee) ois.readObject();               
>            } catch (ClassNotFoundException e) {
>                System.out.println("EXCEPTOPN" + e);
>                e.printStackTrace();
>            }
>           sarah.print();
>        }
>
>    }
>
>    public void readArcs(Path arcFiles, Path outDir) throws IOException {
>
>    }
>
>    public int run(String[] args) throws Exception {
>
>        String usage = "Usage: ArcReader <arcFiles>";
>
>        if (args.length < 2) {
>            System.err.println(usage);
>            return -1;
>        }
>
>
>
>        JobConf job = new JobConf(getConf(), CustomObjectReader.class);
>        job.setJobName("custom reader");
>
>        job.setInputFormat(CustomGzipInputFormat.class);
>        job.setMapperClass(CustomObjectReader.class);
>        job.setMapOutputKeyClass(Text.class);
>        job.setMapOutputValueClass(Text.class);
>
>        job.setOutputFormat(TextOutputFormat.class);
>        job.setOutputKeyClass(Text.class);
>        job.setOutputValueClass(Text.class);
>
>        job.setReducerClass(Reduce.class);
>
>        List<String> other_args = new ArrayList<String>();
>        for (int i = 0; i < args.length; ++i) {
>            try {
>                if ("-m".equals(args[i])) {
>                    job.setNumMapTasks(Integer.parseInt(args[++i]));
>                } else if ("-r".equals(args[i])) {
>                    job.setNumReduceTasks(Integer.parseInt(args[++i]));
>                } else {
>                    other_args.add(args[i]);
>                }
>            } catch (NumberFormatException except) {
>                System.out.println("ERROR: Integer expected instead of "
>                        + args[i]);
>                return printUsage();
>            } catch (ArrayIndexOutOfBoundsException except) {
>                System.out.println("ERROR: Required parameter missing 
> from "
>                        + args[i - 1]);
>                return printUsage();
>            }
>        }
>        // Make sure there are exactly 2 parameters left.
>        if (other_args.size() != 2) {
>            System.out.println("ERROR: Wrong number of parameters: "
>                    + other_args.size() + " instead of 2.");
>            return printUsage();
>        }
>        FileInputFormat.setInputPaths(job, other_args.get(0));
>        FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));
>
>        JobClient.runJob(job);
>        return 0;
>
>    }
>
>    static int printUsage() {
>        System.out
>                .println("wordcount [-m <maps>] [-r <reduces>] <input> 
> <output>");
>        ToolRunner.printGenericCommandUsage(System.out);
>        return -1;
>    }
>
>    public static void main(String[] args) throws Exception {
>        int res = ToolRunner.run(new Configuration(), new 
> CustomObjectReader(),
>                args);
>        System.exit(res);
>    }
>
> }
>
>
> *********************************************************************************** 
>
> package org.apache.hadoop.mapred;
>
> import java.io.FileInputStream;
> import java.io.FileOutputStream;
> import java.io.ObjectInputStream;
> import java.io.ObjectOutputStream;
> import java.io.Serializable;
> import java.util.zip.GZIPInputStream;
> import java.util.zip.GZIPOutputStream;
>
> public class Employee implements Serializable {
>    String name;
>    int age;
>    int salary;
>
>    public Employee(String name, int age, int salary) {
>        this.name = name;
>        this.age = age;
>        this.salary = salary;
>    }
>
>    public void print() {
>        System.out.println("Record for: " + name);
>        System.out.println("Name: " + name);
>        System.out.println("Age: " + age);
>        System.out.println("Salary: " + salary);
>    }
>      public String printObject() {
>        return "name" +name +"\tage" + age +"\tSalary: " + salary;
>    }
>
>    public static void main(String argv[]) throws Exception {
>        // create some objects
>        org.apache.hadoop.mapred.Employee sarah = new Employee("S. 
> Jordan", 28, 56000);
>
>
>            // serialize the objects sarah and sam
>            FileOutputStream fos = new FileOutputStream(
>                    "/home/amitsingh/OUTPUT/out.bin");
>            GZIPOutputStream gz = new GZIPOutputStream(fos);
>            ObjectOutputStream oos = new ObjectOutputStream(gz);
>
>            for (int i = 0; i < 500000; i++) {
>                org.apache.hadoop.mapred.Employee sam = new Employee(i 
> + "MyNameIsGreat", i,
>                        i + 50000);
>                oos.writeObject(sam);
>            }
>            oos.flush();
>            oos.close();
>            //fos.close();
>
>            // deserialize objects sarah and sam
>            FileInputStream fis = new 
> FileInputStream("/home/amitsingh/OUTPUT/out.bin");
>            GZIPInputStream gs = new GZIPInputStream(fis);
>            ObjectInputStream ois = new ObjectInputStream(gs);
>            for (int i = 0; i < 10; i++) {
>                sarah = (org.apache.hadoop.mapred.Employee) 
> ois.readObject();
>                sarah.print();
>
>            }
>            ois.close();
>            //fis.close();
>        }
>  
> }
>


Re: Re: File Splits in Hadoop

Posted by amitsingh <am...@cse.iitb.ac.in>.
Thanks for discussion Taran,

The problem still persists.
What should be done if i have a record which spans multiple PSplits 
(physcial splits on HDFS)?
What happens if  we try to read beyond a pSplit?
Is the next read transparently done from records corresponding to next  
block for the same file (might not be on the same machine) or
next block (may not be of the same file) from the local disk is read.

If its former i guess things should have worked fine (surprisingly they 
arent !! i m goofing  it up somewhere).
If its latter then i have no idea how to tackle this. (Any help would be 
highly appreciated)



**************************************************************************************************

I Tried running a simple program where in I created a sample GZip file 
by serailizing records
           // serialize the objects sarah and sam
           FileOutputStream fos = new 
FileOutputStream("/home/amitsingh/OUTPUT/out.bin");
           GZIPOutputStream gz = new GZIPOutputStream(fos);
           ObjectOutputStream oos = new ObjectOutputStream(gz);

           for (int i = 0; i < 500000; i++) {
               Employee sam = new Employee(i + "name", i,   i + 50000);
            // 3 fields , 2 int , 1 string
               oos.writeObject(sam);
           }
           oos.flush();
           oos.close();

Now if i just run a simple map reduce on this binary file, it gives 
exception java.io.EOFException: Unexpected end of ZLIB input stream
It creates 2 splits
Split 1: hdfs://localhost:54310/user/amitsingh/out1: start:0 
length:1555001 hosts: sandpiper ,bytesRemaining: 1555001
Split 2:  hdfs://localhost:54310/user/amitsingh/out1: start1555001 
length:1555001 hosts: sandpiper ,

For Map1--> Split1 i get java.io.EOFException: Unexpected end of ZLIB 
input stream [for startLens[0]  start:0    len1556480]
For Map2--> No valid GZip is found as startLens is empty

I am not sure why in Map1 len1556480 and not 3110002(entire file) as  
there is ONLY one GZip and thats the entire file.
Any guidance would be of great help ??







**************************************************************************************************************
Source code
**************************************************************************************************************

package org.apache.hadoop.mapred;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

public class CustomGzipRecordReader implements
        RecordReader<Text, BytesWritable> {

    public static final Log LOG = LogFactory
            .getLog(CustomGzipRecordReader.class);

    protected Configuration conf;
    protected long splitStart = 0;
    protected long pos = 0;
    protected long splitEnd = 0;
    protected long splitLen = 0;
    protected long fileLen = 0;
    protected FSDataInputStream in;
    protected int recordIndex = 0;
    protected long[][] startLens;
    protected byte[] buffer = new byte[4096];

    private static byte[] MAGIC = { (byte) 0x1F, (byte) 0x8B };

    //chech the split and populate startLens indicating at which all 
offset a Zlib file starts in this split
    private void parseArcBytes() throws IOException {

        long totalRead = in.getPos();
        byte[] buffer = new byte[4096];
        List<Long> starts = new ArrayList<Long>();

        int read = -1;
        while ((read = in.read(buffer)) > 0) {

            for (int i = 0; i < (read - 1); i++) {

                if ((buffer[i] == (byte) 0x1F)
                        && (buffer[i + 1] == (byte) 0x8B)) {
                    long curStart = totalRead + i;
                    in.seek(curStart);
                    byte[] zipbytes = null;
                    try {
                        zipbytes = new byte[32];
                        in.read(zipbytes);
                        ByteArrayInputStream zipin = new 
ByteArrayInputStream(
                                zipbytes);
                        GZIPInputStream zin = new GZIPInputStream(zipin);
                        zin.close();
                        zipin.close();
                        starts.add(curStart);
                        LOG.info("curStart: " + (curStart));
                    } catch (Exception e) {
                        LOG.info("Ignoring position: " + (curStart));
                        continue;
                    }
                }
            }

            totalRead += read;
            in.seek(totalRead);
            if (totalRead > splitEnd) {
                break;
            }
        }

        startLens = new long[starts.size()][2];
        for (int i = 0; i < starts.size(); i++) {

            long start = starts.get(i);
            long length = ((i < starts.size() - 1) ? starts.get(i + 1)
                    : totalRead)
                    - start;
            startLens[i][0] = start;
            startLens[i][1] = length;
            System.out.println("startLens[" + i + "][0] " + startLens[i][0]
                    + "\t\t" + startLens[i][1]);
        }
    }

    public CustomGzipRecordReader(Configuration conf, FileSplit split)
            throws IOException {

        Path path = split.getPath();
        FileSystem fs = path.getFileSystem(conf);
        fileLen = fs.getFileStatus(split.getPath()).getLen();
        this.conf = conf;
        this.in = fs.open(split.getPath());
        this.splitStart = split.getStart();
        this.splitEnd = splitStart + split.getLength();
        this.splitLen = split.getLength();
        in.seek(splitStart);
        parseArcBytes();

    }

    public void close() throws IOException {
        this.in.close();
    }

    public Text createKey() {
        return (Text) ReflectionUtils.newInstance(Text.class, conf);
    }

    public BytesWritable createValue() {
        return (BytesWritable) 
ReflectionUtils.newInstance(BytesWritable.class,
                conf);
    }

    public long getPos() throws IOException {
        return 0;
    }

    public float getProgress() throws IOException {

        if (recordIndex == 0) {
            return 0.0f;
        } else {
            // the progress is current pos - where we started / length 
of the
            // split
            return Math.min(1.0f, (float) (recordIndex / startLens.length));
        }
    }

    public boolean next(Text key, BytesWritable value) throws IOException {
        long start = 0;
        long len = 0;
        try {
            LOG.info("NEXT Called ");
            int index = recordIndex++;
            if (index >= startLens.length) {
                LOG.info("BAD ");
                return false;
            } else {
                LOG.info("GOOD");
            }

            start = startLens[index][0];
            len = startLens[index][1];
            byte[] zipbytes = new byte[(int) len];

            LOG.info("start:" + start + "\tlen" + len);

            in.seek(start);
            in.read(zipbytes);

            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ByteArrayInputStream zipin = new ByteArrayInputStream(zipbytes);
            GZIPInputStream zin = new GZIPInputStream(zipin);

            int gzipRead = -1;
            int totalGzipRead = 0;
            baos.reset();
            try {
                while ((gzipRead = zin.read(buffer, 0, buffer.length)) 
!= -1) {  // <--------SOURCE of exception
                    baos.write(buffer, 0, gzipRead);
                    totalGzipRead += gzipRead;
                }
            } catch (Exception ex) {
                ex.printStackTrace();
                LOG
                        .info(ex.toString() + "\nstart:" + start + 
"\tlen"        
                                + len);
                LOG.equals(StringUtils.stringifyException(ex));
            }

            byte[] pageBytes = baos.toByteArray();
            baos.close();
            zin.close();
            zipin.close();

//            GZIPInputStream gs = new GZIPInputStream(new 
ByteArrayInputStream(
//                    bytes.get()));

            // ObjectInputStream ois = new ObjectInputStream(zin);
            // for (int i = 0; i < 500000; i++) {
            // Employee sarah = null;
            // try {
            // sarah = (Employee) ois.readObject();
            // // LOG.info(sarah.printObject());
            // } catch (ClassNotFoundException e) {
            // // TODO Auto-generated catch block
            // LOG.info(e + "start:" + start + "\tlen" + len);
            // e.printStackTrace();
            // }
            // // sarah.print();
            //
            // }
           
             Text keyText = (Text) key;
             keyText.set(""+index);
             BytesWritable valueBytes = (BytesWritable) value;
             valueBytes.set(pageBytes, 0, pageBytes.length);

            return true;
        } catch (Exception e) {
            e.printStackTrace();
            LOG.info(e.toString() + "start:" + start + "\tlen" + len);
            LOG.equals(StringUtils.stringifyException(e));
            return false;
        }

    }
}

***********************************************************************************

package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
/**
 * A input format the reads custom gzip files.
 */
public class CustomGzipInputFormat extends FileInputFormat<Text, 
BytesWritable> {
    public RecordReader<Text, BytesWritable> getRecordReader(InputSplit 
split,
            JobConf job, Reporter reporter) throws IOException {
        reporter.setStatus(split.toString());
        return new CustomGzipRecordReader(job, (FileSplit) split);
    }

}

***********************************************************************************


package org.apache.hadoop.examples;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.examples.WordCount.Reduce;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.CustomGzipInputFormat;
import org.apache.hadoop.mapred.Employee;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class CustomObjectReader extends Configured implements Tool,
        Mapper<Text, BytesWritable, Text, Text> {

    private JobConf jobConf;

    // public static final Log LOG = 
LogFactory.getLog(ArcSegmentCreator.class);

    public CustomObjectReader() {

    }

    public void configure(JobConf job) {
        this.jobConf = job;
    }

    public void close() {
    }

    public CustomObjectReader(Configuration conf) {
        setConf(conf);
    }

    public void map(Text key, BytesWritable bytes,
            OutputCollector<Text, Text> output, Reporter reporter)
            throws IOException {
        ObjectInputStream ois = new ObjectInputStream(new 
ByteArrayInputStream(
                bytes.get()));
        long count =0;
        for (int i = 0; i < 500000; i++) {
            Employee sarah = null;
           
            try {
                sarah = (Employee) ois.readObject();               
            } catch (ClassNotFoundException e) {
                System.out.println("EXCEPTOPN" + e);
                e.printStackTrace();
            }
           sarah.print();
        }

    }

    public void readArcs(Path arcFiles, Path outDir) throws IOException {

    }

    public int run(String[] args) throws Exception {

        String usage = "Usage: ArcReader <arcFiles>";

        if (args.length < 2) {
            System.err.println(usage);
            return -1;
        }



        JobConf job = new JobConf(getConf(), CustomObjectReader.class);
        job.setJobName("custom reader");

        job.setInputFormat(CustomGzipInputFormat.class);
        job.setMapperClass(CustomObjectReader.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormat(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setReducerClass(Reduce.class);

        List<String> other_args = new ArrayList<String>();
        for (int i = 0; i < args.length; ++i) {
            try {
                if ("-m".equals(args[i])) {
                    job.setNumMapTasks(Integer.parseInt(args[++i]));
                } else if ("-r".equals(args[i])) {
                    job.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else {
                    other_args.add(args[i]);
                }
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of "
                        + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from "
                        + args[i - 1]);
                return printUsage();
            }
        }
        // Make sure there are exactly 2 parameters left.
        if (other_args.size() != 2) {
            System.out.println("ERROR: Wrong number of parameters: "
                    + other_args.size() + " instead of 2.");
            return printUsage();
        }
        FileInputFormat.setInputPaths(job, other_args.get(0));
        FileOutputFormat.setOutputPath(job, new Path(other_args.get(1)));

        JobClient.runJob(job);
        return 0;

    }

    static int printUsage() {
        System.out
                .println("wordcount [-m <maps>] [-r <reduces>] <input> 
<output>");
        ToolRunner.printGenericCommandUsage(System.out);
        return -1;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new 
CustomObjectReader(),
                args);
        System.exit(res);
    }

}


***********************************************************************************
package org.apache.hadoop.mapred;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class Employee implements Serializable {
    String name;
    int age;
    int salary;

    public Employee(String name, int age, int salary) {
        this.name = name;
        this.age = age;
        this.salary = salary;
    }

    public void print() {
        System.out.println("Record for: " + name);
        System.out.println("Name: " + name);
        System.out.println("Age: " + age);
        System.out.println("Salary: " + salary);
    }
   
    public String printObject() {
        return "name" +name +"\tage" + age +"\tSalary: " + salary;
    }

    public static void main(String argv[]) throws Exception {
        // create some objects
        org.apache.hadoop.mapred.Employee sarah = new Employee("S. 
Jordan", 28, 56000);


            // serialize the objects sarah and sam
            FileOutputStream fos = new FileOutputStream(
                    "/home/amitsingh/OUTPUT/out.bin");
            GZIPOutputStream gz = new GZIPOutputStream(fos);
            ObjectOutputStream oos = new ObjectOutputStream(gz);

            for (int i = 0; i < 500000; i++) {
                org.apache.hadoop.mapred.Employee sam = new Employee(i + 
"MyNameIsGreat", i,
                        i + 50000);
                oos.writeObject(sam);
            }
            oos.flush();
            oos.close();
            //fos.close();

            // deserialize objects sarah and sam
            FileInputStream fis = new 
FileInputStream("/home/amitsingh/OUTPUT/out.bin");
            GZIPInputStream gs = new GZIPInputStream(fis);
            ObjectInputStream ois = new ObjectInputStream(gs);
            for (int i = 0; i < 10; i++) {
                sarah = (org.apache.hadoop.mapred.Employee) 
ois.readObject();
                sarah.print();

            }
            ois.close();
            //fis.close();
        }
   

}


DFS replication and Error Recovery on failure

Posted by amitsingh <am...@cse.iitb.ac.in>.
Hi,

some  queries,

1) If i set value of dfs.replication to 3 only in hadoop-site.xml of 
namenode(master) and
then restart the cluster will this take effect. or  i have to change 
hadoop-site.xml at all slaves ?

2)
What can be possible cause of following error at a datanode. ?
ERROR org.apache.hadoop.dfs.DataNode: java.io.IOException: Incompatible 
namespaceIDs in
 /mnt/hadoop28/HADOOP/hadoop-0.16.3/tmp/dir/hadoop-hadoop/dfs/data: 
namenode namespaceID = 1396640905; datanode namespaceID = 820259954

If my data node goes down due to above error, what should i do in 
following scenarios
1) i have some data on the currupted data node that i need to recover, 
how can i recover that data ?
2) If i dont care about the data, but i want the node back on the 
cluster, can i just delete the /mnt/hadoop28/HADOOP/hadoop-0.16.3/tmp 
and include the node back in the cluster?


Regards
Amit

Re: File Splits in Hadoop

Posted by Tarandeep Singh <ta...@gmail.com>.
On Wed, Dec 10, 2008 at 11:12 AM, amitsingh <am...@cse.iitb.ac.in>wrote:

> Hi,
>
> I am stuck with some questions based on following scenario.
>
> 1) Hadoop normally splits the input file and distributes the splits across
> slaves(referred to as Psplits from now), in to chunks of 64 MB.
> a) Is there Any way to specify split criteria  so for example a huge 4 GB
> file is split in to 40 odd files(Psplits) respecting record boundaries ?


you can set mapred.min.split.size in jobConf
you can set its value greater than block size and hence can force a split to
be larger than block size. However, this might result into splits having
data blocks that are not local.


>
> b) Is it even required that these physical splits(Psplits) obey record
> boundaries ?
>
> 2) We can get locations of these Psplits on HDFS as follows
> BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,  length);
> //FileInputFormat line 273
> In FileInputFormat, for each blkLocations(Psplit) multiple logical
> splits(referred to as Lsplits from now) are created based on hueristic for
> number of mappers.
>
> Q) How is following situation handled in TextInputFormat which reads line
> by line,
>   i) Input File is split as described in step 1 in more than 2 parts
>   ii) Suppose there is a line of text which starts near end of Psplit-i and
> end in Psplit-i+1 (say Psplit2 and Psplit3)
>   iii) Which mapper gets this line spanning multiple Psplits(mapper_i or
> mapper_i+1)
>   iv) I went through the FileInputFormat code, Lsplits are done only for a
> particular pSplit not across pSplit. Why so ?
>
> Q) In short, If one has to read arbitary objects(not line), how does one
> handle records which are partially in one PSplit and partially in other.
>

I am working on this as well and not found exact answer, but in my view
mapper_i should handle the line / record which is partially in one split and
partially in other split. The mapper_i+1 should first seek beginning of new
record (line in this case) and start processing from there.

Someone from Hadoop core team please correct me if this is wrong and fill in
details.

Thanks,
Taran


> --Amit
>
>
>
>