You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by ranjith k <ra...@gmail.com> on 2011/04/07 06:56:52 UTC

Creating custom input split.l

Hello.

I need to create a custom input split. I need to split my input in to 50
line for one input split. How can i do it.
And also there is an another problem for me. I have a file. But it is not in
the form of text. It contain structure. I need to give one structure in to
my map function as value. And the number of the record is my key. How can i
achieve this. please help me.

Ranjith.

Re: Creating custom input split.l

Posted by Steve Lewis <lo...@gmail.com>.
This is a custom splitter for extracting XML Tags from XML documents - to
use is subclass as follows. It assumes the document
is pretty printed with the start and end tags on separate lines.  You may
use this as an example

public class MyTagInputFormat extends XMLTagInputFormat
{
     public MyTagInputFormat() {
              super("MyTag");
     }
}

package org.systemsbiology.hadoop;

/**
 * User: steven
 * Date: 3/7/11
 */

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.*;

import java.io.*;

/**
 * org.systemsbiology.xtandem.hadoop.MzXMLInputFormat
 * Splitter that reads scan tags from a MzXML file which is
 * nice enough to put the begin and end tags on separate lines
 */
public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
    public static final XMLTagInputFormat[] EMPTY_ARRAY = {};

    private final String m_BaseTag;
    private final String m_StartTag;
    private final String m_EndTag;

    public XMLTagInputFormat(final String pBaseTag) {
        m_BaseTag = pBaseTag;
        m_StartTag = "<"  + pBaseTag;
        m_EndTag = "</"  + pBaseTag + ">";

    }

    public String getStartTag() {
        return m_StartTag;
    }

    public String getBaseTag() {
        return m_BaseTag;
    }

    public String getEndTag() {
        return m_EndTag;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
                       TaskAttemptContext context) {
        return new MyWholeFileReader();
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        return true;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single value with the name as a key
     * Value is the entire file
     * Key is the file name
     */
    public   class MyWholeFileReader extends RecordReader<Text, Text> {

        private CompressionCodecFactory compressionCodecs = null;
        private long start;
        private long end;
        private long current;
              private LineReader in;
        private Text key = null;
        private Text value = null;
        private Text buffer = new Text();

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws
IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            start = split.getStart();
            end = start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);

            // open the file and seek to the start of the split
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            if (codec != null) {
                in = new LineReader(codec.createInputStream(fileIn), job);
                end = Long.MAX_VALUE;
            }
            else {
                in = new LineReader(fileIn, job);
            }
            current = start;
            if (key == null) {
                key = new Text();
            }
            key.set(split.getPath().getName());
            if (value == null) {
                value = new Text();
            }

        }

        /**
         * look for a <scan tag then read until it closes
         * @return   true if there is data
         * @throws java.io.IOException
         */
        public boolean nextKeyValue() throws IOException {
            int newSize = 0;
            StringBuilder sb = new StringBuilder();
            newSize = in.readLine(buffer);
            String str = null;
            while (newSize > 0) {
                 str = buffer.toString();
                 if(str.contains(getStartTag()))
                     break;
                newSize = in.readLine(buffer);
             }
            if(newSize == 0)   {
                key = null;
                 value = null;
                 return false;

            }
             while (newSize > 0) {
                str = buffer.toString();
                 sb.append(str);
                 sb.append("\n");
                 if(str.contains(getEndTag()))
                     break;
                 newSize = in.readLine(buffer);
             }

            String s = sb.toString();
            value.set(s);

            if (sb.length() == 0) {
                key = null;
                value = null;
                return false;
            }
            else {
                return true;
            }
        }

        @Override
        public Text getCurrentKey() {
            return key;
        }

        @Override
        public Text getCurrentValue() {
            return value;
        }

        /**
         * Get the progress within the split
         */
        public float getProgress() {
            return ((float)current - start)/ (start - end) ;
        }

        public synchronized void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }
    }
}


On Sat, Apr 9, 2011 at 1:18 AM, Harsh J <ha...@cloudera.com> wrote:

> Hello Ranjith,
>
> On Thu, Apr 7, 2011 at 10:26 AM, ranjith k <ra...@gmail.com> wrote:
> > Hello.
> >
> > I need to create a custom input split. I need to split my input in to 50
> > line for one input split. How can i do it.
>
> Maybe you are looking for the NLineInputFormat? It creates input
> splits for every defined N lines.
>
> > And also there is an another problem for me. I have a file. But it is not
> in
> > the form of text. It contain structure. I need to give one structure in
> to
> > my map function as value. And the number of the record is my key. How can
> i
> > achieve this. please help me.
>
> You will need to implement a custom RecordReader for this; basically
> you'll have to read your file and structure it to your specs using low
> level byte reads off a DFS input stream for your file. Computing the
> number of records in the same go may not be possible if the file/split
> is too large to be held in the memory, but you may create a
> SequenceFile out of this, which has the records count as the key to a
> chunk of records as value.
>
> --
> Harsh J
>



-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Re: Creating custom input split.l

Posted by Harsh J <ha...@cloudera.com>.
Hello Ranjith,

On Thu, Apr 7, 2011 at 10:26 AM, ranjith k <ra...@gmail.com> wrote:
> Hello.
>
> I need to create a custom input split. I need to split my input in to 50
> line for one input split. How can i do it.

Maybe you are looking for the NLineInputFormat? It creates input
splits for every defined N lines.

> And also there is an another problem for me. I have a file. But it is not in
> the form of text. It contain structure. I need to give one structure in to
> my map function as value. And the number of the record is my key. How can i
> achieve this. please help me.

You will need to implement a custom RecordReader for this; basically
you'll have to read your file and structure it to your specs using low
level byte reads off a DFS input stream for your file. Computing the
number of records in the same go may not be possible if the file/split
is too large to be held in the memory, but you may create a
SequenceFile out of this, which has the records count as the key to a
chunk of records as value.

-- 
Harsh J