You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-user@hadoop.apache.org by Erik T <er...@gmail.com> on 2011/07/11 20:57:25 UTC

New to hadoop, trying to write a customary file split

Hello everyone,

I'm new to Hadoop and I'm trying to figure out how to design a M/R program
to parse a file and generate a PMML file as output.

What I would like to do is split a file by a keyword instead a given number
of lines because the location of the split could change from time to time.

I'm looking around and was thinking maybe KeyValueTextInputFormat would be
the way to go but I'm not finding any clear examples how to use it. So I'm
not sure if this is the right choice or not.

Here is a basic input example of what I'm working with.

[Input file info]
more info
more info
etc.
etc.
*Keyword*
different info
different info
*Keyword*
some more info

For the example above, each section can be generated separately from each
other. However, within each section, different lines are dependent upon each
other to generate a valid PMML file.

Can anyone offer a suggestion what type of input format I should use?

Thanks for your time
Erik

Re: New to hadoop, trying to write a customary file split

Posted by Arun C Murthy <ac...@hortonworks.com>.
Hey Steve,

 Want to contribute it as an example to MR? Would love to help.

thanks,
Arun

On Jul 11, 2011, at 12:11 PM, Steve Lewis wrote:

> Look at this sample 
> =============================================
> package org.systemsbiology.hadoop;
> 
> 
> 
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.io.compress.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
> 
> import java.io.*;
> import java.util.*;
> 
> /**
>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>  * Splitter that reads scan tags from an XML file
>  * No assumption is made about lines but tage and end tags MUST look like <MyTag </MyTag> with no embedded spaces
>  * usually you will subclass and hard code the tag you want to split on
>  */
> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
> 
> 
>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
> 
> 
>     public static final int BUFFER_SIZE = 4096;
> 
>     private final String m_BaseTag;
>     private final String m_StartTag;
>     private final String m_EndTag;
>     private String m_Extension;
> 
>     public XMLTagInputFormat(final String pBaseTag) {
>         m_BaseTag = pBaseTag;
>         m_StartTag = "<" + pBaseTag;
>         m_EndTag = "</" + pBaseTag + ">";
> 
>     }
> 
>     public String getExtension() {
>         return m_Extension;
>     }
> 
>     public void setExtension(final String pExtension) {
>         m_Extension = pExtension;
>     }
> 
>     public boolean isSplitReadable(InputSplit split) {
>         if (!(split instanceof FileSplit))
>             return true;
>         FileSplit fsplit = (FileSplit) split;
>         Path path1 = fsplit.getPath();
>         return isPathAcceptable(path1);
>     }
> 
>     protected boolean isPathAcceptable(final Path pPath1) {
>         String path = pPath1.toString().toLowerCase();
>         if(path.startsWith("part-r-"))
>             return true;
>         String extension = getExtension();
>         if (extension != null && path.endsWith(extension.toLowerCase()))
>             return true;
>         if (extension != null && path.endsWith(extension.toLowerCase() + ".gz"))
>             return true;
>         if (extension == null )
>             return true;
>         return false;
>     }
> 
>     public String getStartTag() {
>         return m_StartTag;
>     }
> 
>     public String getBaseTag() {
>         return m_BaseTag;
>     }
> 
>     public String getEndTag() {
>         return m_EndTag;
>     }
> 
>     @Override
>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>                                                        TaskAttemptContext context) {
>         if (isSplitReadable(split))
>             return new MyXMLFileReader();
>         else
>             return NullRecordReader.INSTANCE; // do not read
>     }
> 
>     @Override
>     protected boolean isSplitable(JobContext context, Path file) {
>         String fname = file.getName().toLowerCase();
>         if(fname.endsWith(".gz"))
>             return false;
>         return true;
>     }
> 
>     /**
>      * Generate the list of files and make them into FileSplits.
>      * This needs to be copied to insert a filter on acceptable data
>      */
>     @Override
>     public List<InputSplit> getSplits(JobContext job
>     ) throws IOException {
>         long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
>         long maxSize = getMaxSplitSize(job);
> 
>         // generate splits
>         List<InputSplit> splits = new ArrayList<InputSplit>();
>         for (FileStatus file : listStatus(job)) {
>             Path path = file.getPath();
>             if (!isPathAcceptable(path))   // filter acceptable data
>                 continue;
>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>             long length = file.getLen();
>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
>             if ((length != 0) && isSplitable(job, path)) {
>                 long blockSize = file.getBlockSize();
>                 long splitSize = computeSplitSize(blockSize, minSize, maxSize);
> 
>                 long bytesRemaining = length;
>                 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
>                     int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining);
>                     splits.add(new FileSplit(path, length - bytesRemaining, splitSize,
>                             blkLocations[blkIndex].getHosts()));
>                     bytesRemaining -= splitSize;
>                 }
> 
>                 if (bytesRemaining != 0) {
>                     splits.add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
>                             blkLocations[blkLocations.length - 1].getHosts()));
>                 }
>             }
>             else if (length != 0) {
>                 splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
>             }
>             else {
>                 //Create empty hosts array for zero length files
>                 splits.add(new FileSplit(path, 0, length, new String[0]));
>             }
>         }
>     //    LOG.debug("Total # of splits: " + splits.size());
>         return splits;
>     }
> 
>     /**
>      * Custom RecordReader which returns the entire file as a
>      * single m_Value with the name as a m_Key
>      * Value is the entire file
>      * Key is the file name
>      */
>     public class MyXMLFileReader extends RecordReader<Text, Text> {
> 
>         private CompressionCodecFactory compressionCodecs = null;
>         private long m_Start;
>         private long m_End;
>         private long m_Current;
>         private BufferedReader m_Input;
>         private Text m_Key;
>         private Text m_Value = null;
>         private char[] m_Buffer = new char[BUFFER_SIZE];
>         StringBuilder m_Sb = new StringBuilder();
> 
>         public void initialize(InputSplit genericSplit,
>                                TaskAttemptContext context) throws IOException {
>             FileSplit split = (FileSplit) genericSplit;
>             Configuration job = context.getConfiguration();
>             m_Sb.setLength(0);
>             m_Start = split.getStart();
>             m_End = m_Start + split.getLength();
>             final Path file = split.getPath();
>             compressionCodecs = new CompressionCodecFactory(job);
>             final CompressionCodec codec = compressionCodecs.getCodec(file);
> 
>             // open the file and seek to the m_Start of the split
>             FileSystem fs = file.getFileSystem(job);
>             FSDataInputStream fileIn = fs.open(split.getPath());
>             if (codec != null) {
>                 CompressionInputStream inputStream = codec.createInputStream(fileIn);
>                 m_Input = new BufferedReader(new InputStreamReader(inputStream));
>                 m_End = Long.MAX_VALUE;
>             }
>             else {
>                 m_Input = new BufferedReader(new InputStreamReader(fileIn));
>             }
>             m_Current = m_Start;
>             if (m_Key == null) {
>                 m_Key = new Text();
>             }
>             m_Key.set(split.getPath().getName());
>             if (m_Value == null) {
>                 m_Value = new Text();
>             }
> 
>         }
> 
>         /**
>          * look for a <scan tag then read until it closes
>          *
>          * @return true if there is data
>          * @throws java.io.IOException
>          */
>         public boolean nextKeyValue() throws IOException {
>             if(readFromCurrentBuffer())
>                 return true;
>             int newSize = 0;
>             String startTag = getStartTag() + " ";
>             String startTag2 = getStartTag() + ">";
>             newSize = m_Input.read(m_Buffer);
> 
>             while (newSize > 0) {
>                 m_Current += newSize;
>                 m_Sb.append(m_Buffer, 0, newSize);
>                 if( readFromCurrentBuffer())
>                     return true;
>                 newSize = m_Input.read(m_Buffer);
>             }
>             // exit because we are at the m_End
>             if (newSize <= 0) {
>                 m_Key = null;
>                 m_Value = null;
>                 return false;
>             }
> 
>             return true;
>         }
> 
>         protected boolean readFromCurrentBuffer()
>         {
>             String endTag = getEndTag();
>               String startText = m_Sb.toString();
>             if(!startText.contains(endTag))
>                 return false; // need more read
>             String startTag = getStartTag() + " ";
>              String startTag2 = getStartTag() + ">";
>             int index = startText.indexOf(startTag);
>             if (index == -1)
>                 index = startText.indexOf(startTag2);
>             if(index == -1)
>                 return false;
>             startText = startText.substring(index);
>             m_Sb.setLength(0);
>             m_Sb.append(startText);
> 
>             String s = startText;
>             index = s.indexOf(endTag);
>             if (index == -1)
>                 return false; // need more read
>                // throw new IllegalStateException("unmatched tag " + getBaseTag());
>             index += endTag.length();
>             String tag = s.substring(0, index).trim();
>             m_Value.set(tag);
> 
>             // keep the remaining text to add to the next tag
>             m_Sb.setLength(0);
>             String rest = s.substring(index);
>             m_Sb.append(rest);
>             return true;
>         }
> 
>         @Override
>         public Text getCurrentKey() {
>             return m_Key;
>         }
> 
>         @Override
>         public Text getCurrentValue() {
>             return m_Value;
>         }
> 
>         /**
>          * Get the progress within the split
>          */
>         public float getProgress() {
>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>         }
> 
>         public synchronized void close() throws IOException {
>             if (m_Input != null) {
>                 m_Input.close();
>             }
>         }
>     }
> }
> 
> =============================================
> 
> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <er...@gmail.com> wrote:
> Hello everyone,
> 
> I'm new to Hadoop and I'm trying to figure out how to design a M/R program to parse a file and generate a PMML file as output.
> 
> What I would like to do is split a file by a keyword instead a given number of lines because the location of the split could change from time to time.
> 
> I'm looking around and was thinking maybe KeyValueTextInputFormat would be the way to go but I'm not finding any clear examples how to use it. So I'm not sure if this is the right choice or not.
> 
> Here is a basic input example of what I'm working with.
> 
> [Input file info]
> more info
> more info
> etc.
> etc.
> Keyword
> different info
> different info
> Keyword
> some more info
> 
> For the example above, each section can be generated separately from each other. However, within each section, different lines are dependent upon each other to generate a valid PMML file.
> 
> Can anyone offer a suggestion what type of input format I should use?
> 
> Thanks for your time
> Erik
> 
> 
> 
> -- 
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
> 
> 


Re: New to hadoop, trying to write a customary file split

Posted by Steve Lewis <lo...@gmail.com>.
You are correct = they got refactored int readFromCurrentBuffer

On Mon, Jul 18, 2011 at 11:47 AM, Erik T <er...@gmail.com> wrote:

> I understand that part but I don't see startTag or startTag2 used in the
> nextKeyValue method after they have been declared.
> Erik
>
>
>
> On 18 July 2011 14:20, Steve Lewis <lo...@gmail.com> wrote:
>
>> The reason for the two id that it may say
>> <Foo> ....
>> or
>> <Foo attr1="...
>> - now I suppose you could just look for <Foo which would cover either case
>>
>> Also note I am cheating a bit and this will not handle properly tags which
>> are commented out with
>> the xml comment <!-- but I doubt it is possible to handle these without
>> parsing the entire (potentially large file)
>>
>>
>> On Mon, Jul 18, 2011 at 9:40 AM, Erik T <er...@gmail.com> wrote:
>>
>>> Hi Steven,
>>>
>>> Thank you for the sample. I have one question though.
>>>
>>> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
>>>  Erik
>>>
>>>
>>>
>>> On 11 July 2011 15:11, Steve Lewis <lo...@gmail.com> wrote:
>>>
>>>> Look at this sample
>>>> =============================================
>>>> package org.systemsbiology.hadoop;
>>>>
>>>>
>>>>
>>>> import org.apache.hadoop.conf.*;
>>>> import org.apache.hadoop.fs.*;
>>>> import org.apache.hadoop.fs.FileSystem;
>>>> import org.apache.hadoop.io.*;
>>>> import org.apache.hadoop.io.compress.*;
>>>> import org.apache.hadoop.mapreduce.*;
>>>> import org.apache.hadoop.mapreduce.lib.input.*;
>>>>
>>>> import java.io.*;
>>>> import java.util.*;
>>>>
>>>> /**
>>>>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>>>>  * Splitter that reads scan tags from an XML file
>>>>  * No assumption is made about lines but tage and end tags MUST look
>>>> like <MyTag </MyTag> with no embedded spaces
>>>>  * usually you will subclass and hard code the tag you want to split on
>>>>  */
>>>> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>>>>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>>>>
>>>>
>>>>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>>>>
>>>>
>>>>     public static final int BUFFER_SIZE = 4096;
>>>>
>>>>     private final String m_BaseTag;
>>>>     private final String m_StartTag;
>>>>     private final String m_EndTag;
>>>>     private String m_Extension;
>>>>
>>>>     public XMLTagInputFormat(final String pBaseTag) {
>>>>         m_BaseTag = pBaseTag;
>>>>         m_StartTag = "<" + pBaseTag;
>>>>         m_EndTag = "</" + pBaseTag + ">";
>>>>
>>>>     }
>>>>
>>>>     public String getExtension() {
>>>>         return m_Extension;
>>>>     }
>>>>
>>>>     public void setExtension(final String pExtension) {
>>>>         m_Extension = pExtension;
>>>>     }
>>>>
>>>>     public boolean isSplitReadable(InputSplit split) {
>>>>         if (!(split instanceof FileSplit))
>>>>             return true;
>>>>         FileSplit fsplit = (FileSplit) split;
>>>>         Path path1 = fsplit.getPath();
>>>>         return isPathAcceptable(path1);
>>>>     }
>>>>
>>>>     protected boolean isPathAcceptable(final Path pPath1) {
>>>>         String path = pPath1.toString().toLowerCase();
>>>>         if(path.startsWith("part-r-"))
>>>>             return true;
>>>>         String extension = getExtension();
>>>>         if (extension != null && path.endsWith(extension.toLowerCase()))
>>>>             return true;
>>>>         if (extension != null && path.endsWith(extension.toLowerCase() +
>>>> ".gz"))
>>>>             return true;
>>>>         if (extension == null )
>>>>             return true;
>>>>         return false;
>>>>     }
>>>>
>>>>     public String getStartTag() {
>>>>         return m_StartTag;
>>>>     }
>>>>
>>>>     public String getBaseTag() {
>>>>         return m_BaseTag;
>>>>     }
>>>>
>>>>     public String getEndTag() {
>>>>         return m_EndTag;
>>>>     }
>>>>
>>>>     @Override
>>>>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>>>>
>>>> TaskAttemptContext context) {
>>>>         if (isSplitReadable(split))
>>>>             return new MyXMLFileReader();
>>>>         else
>>>>             return NullRecordReader.INSTANCE; // do not read
>>>>     }
>>>>
>>>>     @Override
>>>>     protected boolean isSplitable(JobContext context, Path file) {
>>>>         String fname = file.getName().toLowerCase();
>>>>         if(fname.endsWith(".gz"))
>>>>             return false;
>>>>         return true;
>>>>     }
>>>>
>>>>     /**
>>>>      * Generate the list of files and make them into FileSplits.
>>>>      * This needs to be copied to insert a filter on acceptable data
>>>>      */
>>>>     @Override
>>>>     public List<InputSplit> getSplits(JobContext job
>>>>     ) throws IOException {
>>>>         long minSize = Math.max(getFormatMinSplitSize(),
>>>> getMinSplitSize(job));
>>>>         long maxSize = getMaxSplitSize(job);
>>>>
>>>>         // generate splits
>>>>         List<InputSplit> splits = new ArrayList<InputSplit>();
>>>>         for (FileStatus file : listStatus(job)) {
>>>>             Path path = file.getPath();
>>>>             if (!isPathAcceptable(path))   // filter acceptable data
>>>>                 continue;
>>>>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>>>>             long length = file.getLen();
>>>>             BlockLocation[] blkLocations =
>>>> fs.getFileBlockLocations(file, 0, length);
>>>>             if ((length != 0) && isSplitable(job, path)) {
>>>>                 long blockSize = file.getBlockSize();
>>>>                 long splitSize = computeSplitSize(blockSize, minSize,
>>>> maxSize);
>>>>
>>>>                 long bytesRemaining = length;
>>>>                 while (((double) bytesRemaining) / splitSize >
>>>> SPLIT_SLOP) {
>>>>                     int blkIndex = getBlockIndex(blkLocations, length -
>>>> bytesRemaining);
>>>>                     splits.add(new FileSplit(path, length -
>>>> bytesRemaining, splitSize,
>>>>                             blkLocations[blkIndex].getHosts()));
>>>>                     bytesRemaining -= splitSize;
>>>>                 }
>>>>
>>>>                 if (bytesRemaining != 0) {
>>>>                     splits.add(new FileSplit(path, length -
>>>> bytesRemaining, bytesRemaining,
>>>>                             blkLocations[blkLocations.length -
>>>> 1].getHosts()));
>>>>                 }
>>>>              }
>>>>             else if (length != 0) {
>>>>                 splits.add(new FileSplit(path, 0, length,
>>>> blkLocations[0].getHosts()));
>>>>             }
>>>>             else {
>>>>                 //Create empty hosts array for zero length files
>>>>                 splits.add(new FileSplit(path, 0, length, new
>>>> String[0]));
>>>>             }
>>>>         }
>>>>     //    LOG.debug("Total # of splits: " + splits.size());
>>>>         return splits;
>>>>     }
>>>>
>>>>     /**
>>>>      * Custom RecordReader which returns the entire file as a
>>>>      * single m_Value with the name as a m_Key
>>>>      * Value is the entire file
>>>>      * Key is the file name
>>>>      */
>>>>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>>>>
>>>>         private CompressionCodecFactory compressionCodecs = null;
>>>>         private long m_Start;
>>>>         private long m_End;
>>>>         private long m_Current;
>>>>         private BufferedReader m_Input;
>>>>         private Text m_Key;
>>>>         private Text m_Value = null;
>>>>         private char[] m_Buffer = new char[BUFFER_SIZE];
>>>>         StringBuilder m_Sb = new StringBuilder();
>>>>
>>>>         public void initialize(InputSplit genericSplit,
>>>>                                TaskAttemptContext context) throws
>>>> IOException {
>>>>             FileSplit split = (FileSplit) genericSplit;
>>>>             Configuration job = context.getConfiguration();
>>>>             m_Sb.setLength(0);
>>>>             m_Start = split.getStart();
>>>>             m_End = m_Start + split.getLength();
>>>>             final Path file = split.getPath();
>>>>             compressionCodecs = new CompressionCodecFactory(job);
>>>>             final CompressionCodec codec =
>>>> compressionCodecs.getCodec(file);
>>>>
>>>>             // open the file and seek to the m_Start of the split
>>>>             FileSystem fs = file.getFileSystem(job);
>>>>             FSDataInputStream fileIn = fs.open(split.getPath());
>>>>             if (codec != null) {
>>>>                 CompressionInputStream inputStream =
>>>> codec.createInputStream(fileIn);
>>>>                 m_Input = new BufferedReader(new
>>>> InputStreamReader(inputStream));
>>>>                 m_End = Long.MAX_VALUE;
>>>>             }
>>>>             else {
>>>>                 m_Input = new BufferedReader(new
>>>> InputStreamReader(fileIn));
>>>>             }
>>>>             m_Current = m_Start;
>>>>             if (m_Key == null) {
>>>>                 m_Key = new Text();
>>>>             }
>>>>             m_Key.set(split.getPath().getName());
>>>>             if (m_Value == null) {
>>>>                 m_Value = new Text();
>>>>             }
>>>>
>>>>         }
>>>>
>>>>         /**
>>>>          * look for a <scan tag then read until it closes
>>>>          *
>>>>          * @return true if there is data
>>>>          * @throws java.io.IOException
>>>>          */
>>>>         public boolean nextKeyValue() throws IOException {
>>>>             if(readFromCurrentBuffer())
>>>>                 return true;
>>>>             int newSize = 0;
>>>>             String startTag = getStartTag() + " ";
>>>>             String startTag2 = getStartTag() + ">";
>>>>             newSize = m_Input.read(m_Buffer);
>>>>
>>>>             while (newSize > 0) {
>>>>                 m_Current += newSize;
>>>>                 m_Sb.append(m_Buffer, 0, newSize);
>>>>                 if( readFromCurrentBuffer())
>>>>                     return true;
>>>>                 newSize = m_Input.read(m_Buffer);
>>>>             }
>>>>             // exit because we are at the m_End
>>>>             if (newSize <= 0) {
>>>>                 m_Key = null;
>>>>                 m_Value = null;
>>>>                 return false;
>>>>             }
>>>>
>>>>             return true;
>>>>         }
>>>>
>>>>         protected boolean readFromCurrentBuffer()
>>>>         {
>>>>             String endTag = getEndTag();
>>>>               String startText = m_Sb.toString();
>>>>             if(!startText.contains(endTag))
>>>>                 return false; // need more read
>>>>             String startTag = getStartTag() + " ";
>>>>              String startTag2 = getStartTag() + ">";
>>>>             int index = startText.indexOf(startTag);
>>>>             if (index == -1)
>>>>                 index = startText.indexOf(startTag2);
>>>>             if(index == -1)
>>>>                 return false;
>>>>             startText = startText.substring(index);
>>>>             m_Sb.setLength(0);
>>>>             m_Sb.append(startText);
>>>>
>>>>             String s = startText;
>>>>             index = s.indexOf(endTag);
>>>>             if (index == -1)
>>>>                 return false; // need more read
>>>>                // throw new IllegalStateException("unmatched tag " +
>>>> getBaseTag());
>>>>             index += endTag.length();
>>>>             String tag = s.substring(0, index).trim();
>>>>             m_Value.set(tag);
>>>>
>>>>             // keep the remaining text to add to the next tag
>>>>             m_Sb.setLength(0);
>>>>             String rest = s.substring(index);
>>>>             m_Sb.append(rest);
>>>>             return true;
>>>>         }
>>>>
>>>>         @Override
>>>>         public Text getCurrentKey() {
>>>>             return m_Key;
>>>>         }
>>>>
>>>>         @Override
>>>>         public Text getCurrentValue() {
>>>>             return m_Value;
>>>>         }
>>>>
>>>>         /**
>>>>          * Get the progress within the split
>>>>          */
>>>>         public float getProgress() {
>>>>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>>>>         }
>>>>
>>>>         public synchronized void close() throws IOException {
>>>>             if (m_Input != null) {
>>>>                 m_Input.close();
>>>>             }
>>>>         }
>>>>     }
>>>> }
>>>>
>>>> =============================================
>>>>
>>>>
>>>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <er...@gmail.com> wrote:
>>>>
>>>>> Hello everyone,
>>>>>
>>>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R
>>>>> program to parse a file and generate a PMML file as output.
>>>>>
>>>>> What I would like to do is split a file by a keyword instead a given
>>>>> number of lines because the location of the split could change from time to
>>>>> time.
>>>>>
>>>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would
>>>>> be the way to go but I'm not finding any clear examples how to use it. So
>>>>> I'm not sure if this is the right choice or not.
>>>>>
>>>>> Here is a basic input example of what I'm working with.
>>>>>
>>>>> [Input file info]
>>>>> more info
>>>>> more info
>>>>> etc.
>>>>> etc.
>>>>> *Keyword*
>>>>> different info
>>>>> different info
>>>>> *Keyword*
>>>>> some more info
>>>>>
>>>>> For the example above, each section can be generated separately from
>>>>> each other. However, within each section, different lines are dependent upon
>>>>> each other to generate a valid PMML file.
>>>>>
>>>>> Can anyone offer a suggestion what type of input format I should use?
>>>>>
>>>>> Thanks for your time
>>>>> Erik
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Steven M. Lewis PhD
>>>> 4221 105th Ave NE
>>>> Kirkland, WA 98033
>>>> 206-384-1340 (cell)
>>>> Skype lordjoe_com
>>>>
>>>>
>>>>
>>>
>>
>>
>> --
>> Steven M. Lewis PhD
>> 4221 105th Ave NE
>> Kirkland, WA 98033
>> 206-384-1340 (cell)
>> Skype lordjoe_com
>>
>>
>>
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Re: New to hadoop, trying to write a customary file split

Posted by Erik T <er...@gmail.com>.
I understand that part but I don't see startTag or startTag2 used in the
nextKeyValue method after they have been declared.
Erik


On 18 July 2011 14:20, Steve Lewis <lo...@gmail.com> wrote:

> The reason for the two id that it may say
> <Foo> ....
> or
> <Foo attr1="...
> - now I suppose you could just look for <Foo which would cover either case
>
> Also note I am cheating a bit and this will not handle properly tags which
> are commented out with
> the xml comment <!-- but I doubt it is possible to handle these without
> parsing the entire (potentially large file)
>
>
> On Mon, Jul 18, 2011 at 9:40 AM, Erik T <er...@gmail.com> wrote:
>
>> Hi Steven,
>>
>> Thank you for the sample. I have one question though.
>>
>> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
>>  Erik
>>
>>
>>
>> On 11 July 2011 15:11, Steve Lewis <lo...@gmail.com> wrote:
>>
>>> Look at this sample
>>> =============================================
>>> package org.systemsbiology.hadoop;
>>>
>>>
>>>
>>> import org.apache.hadoop.conf.*;
>>> import org.apache.hadoop.fs.*;
>>> import org.apache.hadoop.fs.FileSystem;
>>> import org.apache.hadoop.io.*;
>>> import org.apache.hadoop.io.compress.*;
>>> import org.apache.hadoop.mapreduce.*;
>>> import org.apache.hadoop.mapreduce.lib.input.*;
>>>
>>> import java.io.*;
>>> import java.util.*;
>>>
>>> /**
>>>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>>>  * Splitter that reads scan tags from an XML file
>>>  * No assumption is made about lines but tage and end tags MUST look like
>>> <MyTag </MyTag> with no embedded spaces
>>>  * usually you will subclass and hard code the tag you want to split on
>>>  */
>>> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>>>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>>>
>>>
>>>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>>>
>>>
>>>     public static final int BUFFER_SIZE = 4096;
>>>
>>>     private final String m_BaseTag;
>>>     private final String m_StartTag;
>>>     private final String m_EndTag;
>>>     private String m_Extension;
>>>
>>>     public XMLTagInputFormat(final String pBaseTag) {
>>>         m_BaseTag = pBaseTag;
>>>         m_StartTag = "<" + pBaseTag;
>>>         m_EndTag = "</" + pBaseTag + ">";
>>>
>>>     }
>>>
>>>     public String getExtension() {
>>>         return m_Extension;
>>>     }
>>>
>>>     public void setExtension(final String pExtension) {
>>>         m_Extension = pExtension;
>>>     }
>>>
>>>     public boolean isSplitReadable(InputSplit split) {
>>>         if (!(split instanceof FileSplit))
>>>             return true;
>>>         FileSplit fsplit = (FileSplit) split;
>>>         Path path1 = fsplit.getPath();
>>>         return isPathAcceptable(path1);
>>>     }
>>>
>>>     protected boolean isPathAcceptable(final Path pPath1) {
>>>         String path = pPath1.toString().toLowerCase();
>>>         if(path.startsWith("part-r-"))
>>>             return true;
>>>         String extension = getExtension();
>>>         if (extension != null && path.endsWith(extension.toLowerCase()))
>>>             return true;
>>>         if (extension != null && path.endsWith(extension.toLowerCase() +
>>> ".gz"))
>>>             return true;
>>>         if (extension == null )
>>>             return true;
>>>         return false;
>>>     }
>>>
>>>     public String getStartTag() {
>>>         return m_StartTag;
>>>     }
>>>
>>>     public String getBaseTag() {
>>>         return m_BaseTag;
>>>     }
>>>
>>>     public String getEndTag() {
>>>         return m_EndTag;
>>>     }
>>>
>>>     @Override
>>>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>>>                                                        TaskAttemptContext
>>> context) {
>>>         if (isSplitReadable(split))
>>>             return new MyXMLFileReader();
>>>         else
>>>             return NullRecordReader.INSTANCE; // do not read
>>>     }
>>>
>>>     @Override
>>>     protected boolean isSplitable(JobContext context, Path file) {
>>>         String fname = file.getName().toLowerCase();
>>>         if(fname.endsWith(".gz"))
>>>             return false;
>>>         return true;
>>>     }
>>>
>>>     /**
>>>      * Generate the list of files and make them into FileSplits.
>>>      * This needs to be copied to insert a filter on acceptable data
>>>      */
>>>     @Override
>>>     public List<InputSplit> getSplits(JobContext job
>>>     ) throws IOException {
>>>         long minSize = Math.max(getFormatMinSplitSize(),
>>> getMinSplitSize(job));
>>>         long maxSize = getMaxSplitSize(job);
>>>
>>>         // generate splits
>>>         List<InputSplit> splits = new ArrayList<InputSplit>();
>>>         for (FileStatus file : listStatus(job)) {
>>>             Path path = file.getPath();
>>>             if (!isPathAcceptable(path))   // filter acceptable data
>>>                 continue;
>>>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>>>             long length = file.getLen();
>>>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
>>> 0, length);
>>>             if ((length != 0) && isSplitable(job, path)) {
>>>                 long blockSize = file.getBlockSize();
>>>                 long splitSize = computeSplitSize(blockSize, minSize,
>>> maxSize);
>>>
>>>                 long bytesRemaining = length;
>>>                 while (((double) bytesRemaining) / splitSize >
>>> SPLIT_SLOP) {
>>>                     int blkIndex = getBlockIndex(blkLocations, length -
>>> bytesRemaining);
>>>                     splits.add(new FileSplit(path, length -
>>> bytesRemaining, splitSize,
>>>                             blkLocations[blkIndex].getHosts()));
>>>                     bytesRemaining -= splitSize;
>>>                 }
>>>
>>>                 if (bytesRemaining != 0) {
>>>                     splits.add(new FileSplit(path, length -
>>> bytesRemaining, bytesRemaining,
>>>                             blkLocations[blkLocations.length -
>>> 1].getHosts()));
>>>                 }
>>>              }
>>>             else if (length != 0) {
>>>                 splits.add(new FileSplit(path, 0, length,
>>> blkLocations[0].getHosts()));
>>>             }
>>>             else {
>>>                 //Create empty hosts array for zero length files
>>>                 splits.add(new FileSplit(path, 0, length, new
>>> String[0]));
>>>             }
>>>         }
>>>     //    LOG.debug("Total # of splits: " + splits.size());
>>>         return splits;
>>>     }
>>>
>>>     /**
>>>      * Custom RecordReader which returns the entire file as a
>>>      * single m_Value with the name as a m_Key
>>>      * Value is the entire file
>>>      * Key is the file name
>>>      */
>>>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>>>
>>>         private CompressionCodecFactory compressionCodecs = null;
>>>         private long m_Start;
>>>         private long m_End;
>>>         private long m_Current;
>>>         private BufferedReader m_Input;
>>>         private Text m_Key;
>>>         private Text m_Value = null;
>>>         private char[] m_Buffer = new char[BUFFER_SIZE];
>>>         StringBuilder m_Sb = new StringBuilder();
>>>
>>>         public void initialize(InputSplit genericSplit,
>>>                                TaskAttemptContext context) throws
>>> IOException {
>>>             FileSplit split = (FileSplit) genericSplit;
>>>             Configuration job = context.getConfiguration();
>>>             m_Sb.setLength(0);
>>>             m_Start = split.getStart();
>>>             m_End = m_Start + split.getLength();
>>>             final Path file = split.getPath();
>>>             compressionCodecs = new CompressionCodecFactory(job);
>>>             final CompressionCodec codec =
>>> compressionCodecs.getCodec(file);
>>>
>>>             // open the file and seek to the m_Start of the split
>>>             FileSystem fs = file.getFileSystem(job);
>>>             FSDataInputStream fileIn = fs.open(split.getPath());
>>>             if (codec != null) {
>>>                 CompressionInputStream inputStream =
>>> codec.createInputStream(fileIn);
>>>                 m_Input = new BufferedReader(new
>>> InputStreamReader(inputStream));
>>>                 m_End = Long.MAX_VALUE;
>>>             }
>>>             else {
>>>                 m_Input = new BufferedReader(new
>>> InputStreamReader(fileIn));
>>>             }
>>>             m_Current = m_Start;
>>>             if (m_Key == null) {
>>>                 m_Key = new Text();
>>>             }
>>>             m_Key.set(split.getPath().getName());
>>>             if (m_Value == null) {
>>>                 m_Value = new Text();
>>>             }
>>>
>>>         }
>>>
>>>         /**
>>>          * look for a <scan tag then read until it closes
>>>          *
>>>          * @return true if there is data
>>>          * @throws java.io.IOException
>>>          */
>>>         public boolean nextKeyValue() throws IOException {
>>>             if(readFromCurrentBuffer())
>>>                 return true;
>>>             int newSize = 0;
>>>             String startTag = getStartTag() + " ";
>>>             String startTag2 = getStartTag() + ">";
>>>             newSize = m_Input.read(m_Buffer);
>>>
>>>             while (newSize > 0) {
>>>                 m_Current += newSize;
>>>                 m_Sb.append(m_Buffer, 0, newSize);
>>>                 if( readFromCurrentBuffer())
>>>                     return true;
>>>                 newSize = m_Input.read(m_Buffer);
>>>             }
>>>             // exit because we are at the m_End
>>>             if (newSize <= 0) {
>>>                 m_Key = null;
>>>                 m_Value = null;
>>>                 return false;
>>>             }
>>>
>>>             return true;
>>>         }
>>>
>>>         protected boolean readFromCurrentBuffer()
>>>         {
>>>             String endTag = getEndTag();
>>>               String startText = m_Sb.toString();
>>>             if(!startText.contains(endTag))
>>>                 return false; // need more read
>>>             String startTag = getStartTag() + " ";
>>>              String startTag2 = getStartTag() + ">";
>>>             int index = startText.indexOf(startTag);
>>>             if (index == -1)
>>>                 index = startText.indexOf(startTag2);
>>>             if(index == -1)
>>>                 return false;
>>>             startText = startText.substring(index);
>>>             m_Sb.setLength(0);
>>>             m_Sb.append(startText);
>>>
>>>             String s = startText;
>>>             index = s.indexOf(endTag);
>>>             if (index == -1)
>>>                 return false; // need more read
>>>                // throw new IllegalStateException("unmatched tag " +
>>> getBaseTag());
>>>             index += endTag.length();
>>>             String tag = s.substring(0, index).trim();
>>>             m_Value.set(tag);
>>>
>>>             // keep the remaining text to add to the next tag
>>>             m_Sb.setLength(0);
>>>             String rest = s.substring(index);
>>>             m_Sb.append(rest);
>>>             return true;
>>>         }
>>>
>>>         @Override
>>>         public Text getCurrentKey() {
>>>             return m_Key;
>>>         }
>>>
>>>         @Override
>>>         public Text getCurrentValue() {
>>>             return m_Value;
>>>         }
>>>
>>>         /**
>>>          * Get the progress within the split
>>>          */
>>>         public float getProgress() {
>>>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>>>         }
>>>
>>>         public synchronized void close() throws IOException {
>>>             if (m_Input != null) {
>>>                 m_Input.close();
>>>             }
>>>         }
>>>     }
>>> }
>>>
>>> =============================================
>>>
>>>
>>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <er...@gmail.com> wrote:
>>>
>>>> Hello everyone,
>>>>
>>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R
>>>> program to parse a file and generate a PMML file as output.
>>>>
>>>> What I would like to do is split a file by a keyword instead a given
>>>> number of lines because the location of the split could change from time to
>>>> time.
>>>>
>>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would
>>>> be the way to go but I'm not finding any clear examples how to use it. So
>>>> I'm not sure if this is the right choice or not.
>>>>
>>>> Here is a basic input example of what I'm working with.
>>>>
>>>> [Input file info]
>>>> more info
>>>> more info
>>>> etc.
>>>> etc.
>>>> *Keyword*
>>>> different info
>>>> different info
>>>> *Keyword*
>>>> some more info
>>>>
>>>> For the example above, each section can be generated separately from
>>>> each other. However, within each section, different lines are dependent upon
>>>> each other to generate a valid PMML file.
>>>>
>>>> Can anyone offer a suggestion what type of input format I should use?
>>>>
>>>> Thanks for your time
>>>> Erik
>>>>
>>>
>>>
>>>
>>> --
>>> Steven M. Lewis PhD
>>> 4221 105th Ave NE
>>> Kirkland, WA 98033
>>> 206-384-1340 (cell)
>>> Skype lordjoe_com
>>>
>>>
>>>
>>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>
>

Re: New to hadoop, trying to write a customary file split

Posted by Steve Lewis <lo...@gmail.com>.
The reason for the two id that it may say
<Foo> ....
or
<Foo attr1="...
- now I suppose you could just look for <Foo which would cover either case

Also note I am cheating a bit and this will not handle properly tags which
are commented out with
the xml comment <!-- but I doubt it is possible to handle these without
parsing the entire (potentially large file)


On Mon, Jul 18, 2011 at 9:40 AM, Erik T <er...@gmail.com> wrote:

> Hi Steven,
>
> Thank you for the sample. I have one question though.
>
> In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
> Erik
>
>
>
> On 11 July 2011 15:11, Steve Lewis <lo...@gmail.com> wrote:
>
>> Look at this sample
>> =============================================
>> package org.systemsbiology.hadoop;
>>
>>
>>
>> import org.apache.hadoop.conf.*;
>> import org.apache.hadoop.fs.*;
>> import org.apache.hadoop.fs.FileSystem;
>> import org.apache.hadoop.io.*;
>> import org.apache.hadoop.io.compress.*;
>> import org.apache.hadoop.mapreduce.*;
>> import org.apache.hadoop.mapreduce.lib.input.*;
>>
>> import java.io.*;
>> import java.util.*;
>>
>> /**
>>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>>  * Splitter that reads scan tags from an XML file
>>  * No assumption is made about lines but tage and end tags MUST look like
>> <MyTag </MyTag> with no embedded spaces
>>  * usually you will subclass and hard code the tag you want to split on
>>  */
>> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>>
>>
>>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>>
>>
>>     public static final int BUFFER_SIZE = 4096;
>>
>>     private final String m_BaseTag;
>>     private final String m_StartTag;
>>     private final String m_EndTag;
>>     private String m_Extension;
>>
>>     public XMLTagInputFormat(final String pBaseTag) {
>>         m_BaseTag = pBaseTag;
>>         m_StartTag = "<" + pBaseTag;
>>         m_EndTag = "</" + pBaseTag + ">";
>>
>>     }
>>
>>     public String getExtension() {
>>         return m_Extension;
>>     }
>>
>>     public void setExtension(final String pExtension) {
>>         m_Extension = pExtension;
>>     }
>>
>>     public boolean isSplitReadable(InputSplit split) {
>>         if (!(split instanceof FileSplit))
>>             return true;
>>         FileSplit fsplit = (FileSplit) split;
>>         Path path1 = fsplit.getPath();
>>         return isPathAcceptable(path1);
>>     }
>>
>>     protected boolean isPathAcceptable(final Path pPath1) {
>>         String path = pPath1.toString().toLowerCase();
>>         if(path.startsWith("part-r-"))
>>             return true;
>>         String extension = getExtension();
>>         if (extension != null && path.endsWith(extension.toLowerCase()))
>>             return true;
>>         if (extension != null && path.endsWith(extension.toLowerCase() +
>> ".gz"))
>>             return true;
>>         if (extension == null )
>>             return true;
>>         return false;
>>     }
>>
>>     public String getStartTag() {
>>         return m_StartTag;
>>     }
>>
>>     public String getBaseTag() {
>>         return m_BaseTag;
>>     }
>>
>>     public String getEndTag() {
>>         return m_EndTag;
>>     }
>>
>>     @Override
>>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>>                                                        TaskAttemptContext
>> context) {
>>         if (isSplitReadable(split))
>>             return new MyXMLFileReader();
>>         else
>>             return NullRecordReader.INSTANCE; // do not read
>>     }
>>
>>     @Override
>>     protected boolean isSplitable(JobContext context, Path file) {
>>         String fname = file.getName().toLowerCase();
>>         if(fname.endsWith(".gz"))
>>             return false;
>>         return true;
>>     }
>>
>>     /**
>>      * Generate the list of files and make them into FileSplits.
>>      * This needs to be copied to insert a filter on acceptable data
>>      */
>>     @Override
>>     public List<InputSplit> getSplits(JobContext job
>>     ) throws IOException {
>>         long minSize = Math.max(getFormatMinSplitSize(),
>> getMinSplitSize(job));
>>         long maxSize = getMaxSplitSize(job);
>>
>>         // generate splits
>>         List<InputSplit> splits = new ArrayList<InputSplit>();
>>         for (FileStatus file : listStatus(job)) {
>>             Path path = file.getPath();
>>             if (!isPathAcceptable(path))   // filter acceptable data
>>                 continue;
>>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>>             long length = file.getLen();
>>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
>> 0, length);
>>             if ((length != 0) && isSplitable(job, path)) {
>>                 long blockSize = file.getBlockSize();
>>                 long splitSize = computeSplitSize(blockSize, minSize,
>> maxSize);
>>
>>                 long bytesRemaining = length;
>>                 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP)
>> {
>>                     int blkIndex = getBlockIndex(blkLocations, length -
>> bytesRemaining);
>>                     splits.add(new FileSplit(path, length -
>> bytesRemaining, splitSize,
>>                             blkLocations[blkIndex].getHosts()));
>>                     bytesRemaining -= splitSize;
>>                 }
>>
>>                 if (bytesRemaining != 0) {
>>                     splits.add(new FileSplit(path, length -
>> bytesRemaining, bytesRemaining,
>>                             blkLocations[blkLocations.length -
>> 1].getHosts()));
>>                 }
>>              }
>>             else if (length != 0) {
>>                 splits.add(new FileSplit(path, 0, length,
>> blkLocations[0].getHosts()));
>>             }
>>             else {
>>                 //Create empty hosts array for zero length files
>>                 splits.add(new FileSplit(path, 0, length, new String[0]));
>>             }
>>         }
>>     //    LOG.debug("Total # of splits: " + splits.size());
>>         return splits;
>>     }
>>
>>     /**
>>      * Custom RecordReader which returns the entire file as a
>>      * single m_Value with the name as a m_Key
>>      * Value is the entire file
>>      * Key is the file name
>>      */
>>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>>
>>         private CompressionCodecFactory compressionCodecs = null;
>>         private long m_Start;
>>         private long m_End;
>>         private long m_Current;
>>         private BufferedReader m_Input;
>>         private Text m_Key;
>>         private Text m_Value = null;
>>         private char[] m_Buffer = new char[BUFFER_SIZE];
>>         StringBuilder m_Sb = new StringBuilder();
>>
>>         public void initialize(InputSplit genericSplit,
>>                                TaskAttemptContext context) throws
>> IOException {
>>             FileSplit split = (FileSplit) genericSplit;
>>             Configuration job = context.getConfiguration();
>>             m_Sb.setLength(0);
>>             m_Start = split.getStart();
>>             m_End = m_Start + split.getLength();
>>             final Path file = split.getPath();
>>             compressionCodecs = new CompressionCodecFactory(job);
>>             final CompressionCodec codec =
>> compressionCodecs.getCodec(file);
>>
>>             // open the file and seek to the m_Start of the split
>>             FileSystem fs = file.getFileSystem(job);
>>             FSDataInputStream fileIn = fs.open(split.getPath());
>>             if (codec != null) {
>>                 CompressionInputStream inputStream =
>> codec.createInputStream(fileIn);
>>                 m_Input = new BufferedReader(new
>> InputStreamReader(inputStream));
>>                 m_End = Long.MAX_VALUE;
>>             }
>>             else {
>>                 m_Input = new BufferedReader(new
>> InputStreamReader(fileIn));
>>             }
>>             m_Current = m_Start;
>>             if (m_Key == null) {
>>                 m_Key = new Text();
>>             }
>>             m_Key.set(split.getPath().getName());
>>             if (m_Value == null) {
>>                 m_Value = new Text();
>>             }
>>
>>         }
>>
>>         /**
>>          * look for a <scan tag then read until it closes
>>          *
>>          * @return true if there is data
>>          * @throws java.io.IOException
>>          */
>>         public boolean nextKeyValue() throws IOException {
>>             if(readFromCurrentBuffer())
>>                 return true;
>>             int newSize = 0;
>>             String startTag = getStartTag() + " ";
>>             String startTag2 = getStartTag() + ">";
>>             newSize = m_Input.read(m_Buffer);
>>
>>             while (newSize > 0) {
>>                 m_Current += newSize;
>>                 m_Sb.append(m_Buffer, 0, newSize);
>>                 if( readFromCurrentBuffer())
>>                     return true;
>>                 newSize = m_Input.read(m_Buffer);
>>             }
>>             // exit because we are at the m_End
>>             if (newSize <= 0) {
>>                 m_Key = null;
>>                 m_Value = null;
>>                 return false;
>>             }
>>
>>             return true;
>>         }
>>
>>         protected boolean readFromCurrentBuffer()
>>         {
>>             String endTag = getEndTag();
>>               String startText = m_Sb.toString();
>>             if(!startText.contains(endTag))
>>                 return false; // need more read
>>             String startTag = getStartTag() + " ";
>>              String startTag2 = getStartTag() + ">";
>>             int index = startText.indexOf(startTag);
>>             if (index == -1)
>>                 index = startText.indexOf(startTag2);
>>             if(index == -1)
>>                 return false;
>>             startText = startText.substring(index);
>>             m_Sb.setLength(0);
>>             m_Sb.append(startText);
>>
>>             String s = startText;
>>             index = s.indexOf(endTag);
>>             if (index == -1)
>>                 return false; // need more read
>>                // throw new IllegalStateException("unmatched tag " +
>> getBaseTag());
>>             index += endTag.length();
>>             String tag = s.substring(0, index).trim();
>>             m_Value.set(tag);
>>
>>             // keep the remaining text to add to the next tag
>>             m_Sb.setLength(0);
>>             String rest = s.substring(index);
>>             m_Sb.append(rest);
>>             return true;
>>         }
>>
>>         @Override
>>         public Text getCurrentKey() {
>>             return m_Key;
>>         }
>>
>>         @Override
>>         public Text getCurrentValue() {
>>             return m_Value;
>>         }
>>
>>         /**
>>          * Get the progress within the split
>>          */
>>         public float getProgress() {
>>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>>         }
>>
>>         public synchronized void close() throws IOException {
>>             if (m_Input != null) {
>>                 m_Input.close();
>>             }
>>         }
>>     }
>> }
>>
>> =============================================
>>
>>
>> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <er...@gmail.com> wrote:
>>
>>> Hello everyone,
>>>
>>> I'm new to Hadoop and I'm trying to figure out how to design a M/R
>>> program to parse a file and generate a PMML file as output.
>>>
>>> What I would like to do is split a file by a keyword instead a given
>>> number of lines because the location of the split could change from time to
>>> time.
>>>
>>> I'm looking around and was thinking maybe KeyValueTextInputFormat would
>>> be the way to go but I'm not finding any clear examples how to use it. So
>>> I'm not sure if this is the right choice or not.
>>>
>>> Here is a basic input example of what I'm working with.
>>>
>>> [Input file info]
>>> more info
>>> more info
>>> etc.
>>> etc.
>>> *Keyword*
>>> different info
>>> different info
>>> *Keyword*
>>> some more info
>>>
>>> For the example above, each section can be generated separately from each
>>> other. However, within each section, different lines are dependent upon each
>>> other to generate a valid PMML file.
>>>
>>> Can anyone offer a suggestion what type of input format I should use?
>>>
>>> Thanks for your time
>>> Erik
>>>
>>
>>
>>
>> --
>> Steven M. Lewis PhD
>> 4221 105th Ave NE
>> Kirkland, WA 98033
>> 206-384-1340 (cell)
>> Skype lordjoe_com
>>
>>
>>
>


-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Re: New to hadoop, trying to write a customary file split

Posted by Erik T <er...@gmail.com>.
Hi Steven,

Thank you for the sample. I have one question though.

In MyXMLFileReader, nextKeyValue, is startTag and startTag2 needed?
Erik


On 11 July 2011 15:11, Steve Lewis <lo...@gmail.com> wrote:

> Look at this sample
> =============================================
> package org.systemsbiology.hadoop;
>
>
>
> import org.apache.hadoop.conf.*;
> import org.apache.hadoop.fs.*;
> import org.apache.hadoop.fs.FileSystem;
> import org.apache.hadoop.io.*;
> import org.apache.hadoop.io.compress.*;
> import org.apache.hadoop.mapreduce.*;
> import org.apache.hadoop.mapreduce.lib.input.*;
>
> import java.io.*;
> import java.util.*;
>
> /**
>  * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
>  * Splitter that reads scan tags from an XML file
>  * No assumption is made about lines but tage and end tags MUST look like
> <MyTag </MyTag> with no embedded spaces
>  * usually you will subclass and hard code the tag you want to split on
>  */
> public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
>     public static final XMLTagInputFormat[] EMPTY_ARRAY = {};
>
>
>     private static final double SPLIT_SLOP = 1.1;   // 10% slop
>
>
>     public static final int BUFFER_SIZE = 4096;
>
>     private final String m_BaseTag;
>     private final String m_StartTag;
>     private final String m_EndTag;
>     private String m_Extension;
>
>     public XMLTagInputFormat(final String pBaseTag) {
>         m_BaseTag = pBaseTag;
>         m_StartTag = "<" + pBaseTag;
>         m_EndTag = "</" + pBaseTag + ">";
>
>     }
>
>     public String getExtension() {
>         return m_Extension;
>     }
>
>     public void setExtension(final String pExtension) {
>         m_Extension = pExtension;
>     }
>
>     public boolean isSplitReadable(InputSplit split) {
>         if (!(split instanceof FileSplit))
>             return true;
>         FileSplit fsplit = (FileSplit) split;
>         Path path1 = fsplit.getPath();
>         return isPathAcceptable(path1);
>     }
>
>     protected boolean isPathAcceptable(final Path pPath1) {
>         String path = pPath1.toString().toLowerCase();
>         if(path.startsWith("part-r-"))
>             return true;
>         String extension = getExtension();
>         if (extension != null && path.endsWith(extension.toLowerCase()))
>             return true;
>         if (extension != null && path.endsWith(extension.toLowerCase() +
> ".gz"))
>             return true;
>         if (extension == null )
>             return true;
>         return false;
>     }
>
>     public String getStartTag() {
>         return m_StartTag;
>     }
>
>     public String getBaseTag() {
>         return m_BaseTag;
>     }
>
>     public String getEndTag() {
>         return m_EndTag;
>     }
>
>     @Override
>     public RecordReader<Text, Text> createRecordReader(InputSplit split,
>                                                        TaskAttemptContext
> context) {
>         if (isSplitReadable(split))
>             return new MyXMLFileReader();
>         else
>             return NullRecordReader.INSTANCE; // do not read
>     }
>
>     @Override
>     protected boolean isSplitable(JobContext context, Path file) {
>         String fname = file.getName().toLowerCase();
>         if(fname.endsWith(".gz"))
>             return false;
>         return true;
>     }
>
>     /**
>      * Generate the list of files and make them into FileSplits.
>      * This needs to be copied to insert a filter on acceptable data
>      */
>     @Override
>     public List<InputSplit> getSplits(JobContext job
>     ) throws IOException {
>         long minSize = Math.max(getFormatMinSplitSize(),
> getMinSplitSize(job));
>         long maxSize = getMaxSplitSize(job);
>
>         // generate splits
>         List<InputSplit> splits = new ArrayList<InputSplit>();
>         for (FileStatus file : listStatus(job)) {
>             Path path = file.getPath();
>             if (!isPathAcceptable(path))   // filter acceptable data
>                 continue;
>             FileSystem fs = path.getFileSystem(job.getConfiguration());
>             long length = file.getLen();
>             BlockLocation[] blkLocations = fs.getFileBlockLocations(file,
> 0, length);
>             if ((length != 0) && isSplitable(job, path)) {
>                 long blockSize = file.getBlockSize();
>                 long splitSize = computeSplitSize(blockSize, minSize,
> maxSize);
>
>                 long bytesRemaining = length;
>                 while (((double) bytesRemaining) / splitSize > SPLIT_SLOP)
> {
>                     int blkIndex = getBlockIndex(blkLocations, length -
> bytesRemaining);
>                     splits.add(new FileSplit(path, length - bytesRemaining,
> splitSize,
>                             blkLocations[blkIndex].getHosts()));
>                     bytesRemaining -= splitSize;
>                 }
>
>                 if (bytesRemaining != 0) {
>                     splits.add(new FileSplit(path, length - bytesRemaining,
> bytesRemaining,
>                             blkLocations[blkLocations.length -
> 1].getHosts()));
>                 }
>             }
>             else if (length != 0) {
>                 splits.add(new FileSplit(path, 0, length,
> blkLocations[0].getHosts()));
>             }
>             else {
>                 //Create empty hosts array for zero length files
>                 splits.add(new FileSplit(path, 0, length, new String[0]));
>             }
>         }
>     //    LOG.debug("Total # of splits: " + splits.size());
>         return splits;
>     }
>
>     /**
>      * Custom RecordReader which returns the entire file as a
>      * single m_Value with the name as a m_Key
>      * Value is the entire file
>      * Key is the file name
>      */
>     public class MyXMLFileReader extends RecordReader<Text, Text> {
>
>         private CompressionCodecFactory compressionCodecs = null;
>         private long m_Start;
>         private long m_End;
>         private long m_Current;
>         private BufferedReader m_Input;
>         private Text m_Key;
>         private Text m_Value = null;
>         private char[] m_Buffer = new char[BUFFER_SIZE];
>         StringBuilder m_Sb = new StringBuilder();
>
>         public void initialize(InputSplit genericSplit,
>                                TaskAttemptContext context) throws
> IOException {
>             FileSplit split = (FileSplit) genericSplit;
>             Configuration job = context.getConfiguration();
>             m_Sb.setLength(0);
>             m_Start = split.getStart();
>             m_End = m_Start + split.getLength();
>             final Path file = split.getPath();
>             compressionCodecs = new CompressionCodecFactory(job);
>             final CompressionCodec codec =
> compressionCodecs.getCodec(file);
>
>             // open the file and seek to the m_Start of the split
>             FileSystem fs = file.getFileSystem(job);
>             FSDataInputStream fileIn = fs.open(split.getPath());
>             if (codec != null) {
>                 CompressionInputStream inputStream =
> codec.createInputStream(fileIn);
>                 m_Input = new BufferedReader(new
> InputStreamReader(inputStream));
>                 m_End = Long.MAX_VALUE;
>             }
>             else {
>                 m_Input = new BufferedReader(new
> InputStreamReader(fileIn));
>             }
>             m_Current = m_Start;
>             if (m_Key == null) {
>                 m_Key = new Text();
>             }
>             m_Key.set(split.getPath().getName());
>             if (m_Value == null) {
>                 m_Value = new Text();
>             }
>
>         }
>
>         /**
>          * look for a <scan tag then read until it closes
>          *
>          * @return true if there is data
>          * @throws java.io.IOException
>          */
>         public boolean nextKeyValue() throws IOException {
>             if(readFromCurrentBuffer())
>                 return true;
>             int newSize = 0;
>             String startTag = getStartTag() + " ";
>             String startTag2 = getStartTag() + ">";
>             newSize = m_Input.read(m_Buffer);
>
>             while (newSize > 0) {
>                 m_Current += newSize;
>                 m_Sb.append(m_Buffer, 0, newSize);
>                 if( readFromCurrentBuffer())
>                     return true;
>                 newSize = m_Input.read(m_Buffer);
>             }
>             // exit because we are at the m_End
>             if (newSize <= 0) {
>                 m_Key = null;
>                 m_Value = null;
>                 return false;
>             }
>
>             return true;
>         }
>
>         protected boolean readFromCurrentBuffer()
>         {
>             String endTag = getEndTag();
>               String startText = m_Sb.toString();
>             if(!startText.contains(endTag))
>                 return false; // need more read
>             String startTag = getStartTag() + " ";
>              String startTag2 = getStartTag() + ">";
>             int index = startText.indexOf(startTag);
>             if (index == -1)
>                 index = startText.indexOf(startTag2);
>             if(index == -1)
>                 return false;
>             startText = startText.substring(index);
>             m_Sb.setLength(0);
>             m_Sb.append(startText);
>
>             String s = startText;
>             index = s.indexOf(endTag);
>             if (index == -1)
>                 return false; // need more read
>                // throw new IllegalStateException("unmatched tag " +
> getBaseTag());
>             index += endTag.length();
>             String tag = s.substring(0, index).trim();
>             m_Value.set(tag);
>
>             // keep the remaining text to add to the next tag
>             m_Sb.setLength(0);
>             String rest = s.substring(index);
>             m_Sb.append(rest);
>             return true;
>         }
>
>         @Override
>         public Text getCurrentKey() {
>             return m_Key;
>         }
>
>         @Override
>         public Text getCurrentValue() {
>             return m_Value;
>         }
>
>         /**
>          * Get the progress within the split
>          */
>         public float getProgress() {
>             return ((float) m_Current - m_Start) / (m_Start - m_End);
>         }
>
>         public synchronized void close() throws IOException {
>             if (m_Input != null) {
>                 m_Input.close();
>             }
>         }
>     }
> }
>
> =============================================
>
>
> On Mon, Jul 11, 2011 at 11:57 AM, Erik T <er...@gmail.com> wrote:
>
>> Hello everyone,
>>
>> I'm new to Hadoop and I'm trying to figure out how to design a M/R program
>> to parse a file and generate a PMML file as output.
>>
>> What I would like to do is split a file by a keyword instead a given
>> number of lines because the location of the split could change from time to
>> time.
>>
>> I'm looking around and was thinking maybe KeyValueTextInputFormat would be
>> the way to go but I'm not finding any clear examples how to use it. So I'm
>> not sure if this is the right choice or not.
>>
>> Here is a basic input example of what I'm working with.
>>
>> [Input file info]
>> more info
>> more info
>> etc.
>> etc.
>> *Keyword*
>> different info
>> different info
>> *Keyword*
>> some more info
>>
>> For the example above, each section can be generated separately from each
>> other. However, within each section, different lines are dependent upon each
>> other to generate a valid PMML file.
>>
>> Can anyone offer a suggestion what type of input format I should use?
>>
>> Thanks for your time
>> Erik
>>
>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>
>

Re: New to hadoop, trying to write a customary file split

Posted by Steve Lewis <lo...@gmail.com>.
Look at this sample
=============================================
package org.systemsbiology.hadoop;



import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;

import java.io.*;
import java.util.*;

/**
 * org.systemsbiology.xtandem.hadoop.XMLTagInputFormat
 * Splitter that reads scan tags from an XML file
 * No assumption is made about lines but tage and end tags MUST look like
<MyTag </MyTag> with no embedded spaces
 * usually you will subclass and hard code the tag you want to split on
 */
public class XMLTagInputFormat extends FileInputFormat<Text, Text> {
    public static final XMLTagInputFormat[] EMPTY_ARRAY = {};


    private static final double SPLIT_SLOP = 1.1;   // 10% slop


    public static final int BUFFER_SIZE = 4096;

    private final String m_BaseTag;
    private final String m_StartTag;
    private final String m_EndTag;
    private String m_Extension;

    public XMLTagInputFormat(final String pBaseTag) {
        m_BaseTag = pBaseTag;
        m_StartTag = "<" + pBaseTag;
        m_EndTag = "</" + pBaseTag + ">";

    }

    public String getExtension() {
        return m_Extension;
    }

    public void setExtension(final String pExtension) {
        m_Extension = pExtension;
    }

    public boolean isSplitReadable(InputSplit split) {
        if (!(split instanceof FileSplit))
            return true;
        FileSplit fsplit = (FileSplit) split;
        Path path1 = fsplit.getPath();
        return isPathAcceptable(path1);
    }

    protected boolean isPathAcceptable(final Path pPath1) {
        String path = pPath1.toString().toLowerCase();
        if(path.startsWith("part-r-"))
            return true;
        String extension = getExtension();
        if (extension != null && path.endsWith(extension.toLowerCase()))
            return true;
        if (extension != null && path.endsWith(extension.toLowerCase() +
".gz"))
            return true;
        if (extension == null )
            return true;
        return false;
    }

    public String getStartTag() {
        return m_StartTag;
    }

    public String getBaseTag() {
        return m_BaseTag;
    }

    public String getEndTag() {
        return m_EndTag;
    }

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
                                                       TaskAttemptContext
context) {
        if (isSplitReadable(split))
            return new MyXMLFileReader();
        else
            return NullRecordReader.INSTANCE; // do not read
    }

    @Override
    protected boolean isSplitable(JobContext context, Path file) {
        String fname = file.getName().toLowerCase();
        if(fname.endsWith(".gz"))
            return false;
        return true;
    }

    /**
     * Generate the list of files and make them into FileSplits.
     * This needs to be copied to insert a filter on acceptable data
     */
    @Override
    public List<InputSplit> getSplits(JobContext job
    ) throws IOException {
        long minSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        for (FileStatus file : listStatus(job)) {
            Path path = file.getPath();
            if (!isPathAcceptable(path))   // filter acceptable data
                continue;
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            long length = file.getLen();
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
            if ((length != 0) && isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                long splitSize = computeSplitSize(blockSize, minSize,
maxSize);

                long bytesRemaining = length;
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
                    splits.add(new FileSplit(path, length - bytesRemaining,
splitSize,
                            blkLocations[blkIndex].getHosts()));
                    bytesRemaining -= splitSize;
                }

                if (bytesRemaining != 0) {
                    splits.add(new FileSplit(path, length - bytesRemaining,
bytesRemaining,
                            blkLocations[blkLocations.length -
1].getHosts()));
                }
            }
            else if (length != 0) {
                splits.add(new FileSplit(path, 0, length,
blkLocations[0].getHosts()));
            }
            else {
                //Create empty hosts array for zero length files
                splits.add(new FileSplit(path, 0, length, new String[0]));
            }
        }
    //    LOG.debug("Total # of splits: " + splits.size());
        return splits;
    }

    /**
     * Custom RecordReader which returns the entire file as a
     * single m_Value with the name as a m_Key
     * Value is the entire file
     * Key is the file name
     */
    public class MyXMLFileReader extends RecordReader<Text, Text> {

        private CompressionCodecFactory compressionCodecs = null;
        private long m_Start;
        private long m_End;
        private long m_Current;
        private BufferedReader m_Input;
        private Text m_Key;
        private Text m_Value = null;
        private char[] m_Buffer = new char[BUFFER_SIZE];
        StringBuilder m_Sb = new StringBuilder();

        public void initialize(InputSplit genericSplit,
                               TaskAttemptContext context) throws
IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            m_Sb.setLength(0);
            m_Start = split.getStart();
            m_End = m_Start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);

            // open the file and seek to the m_Start of the split
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            if (codec != null) {
                CompressionInputStream inputStream =
codec.createInputStream(fileIn);
                m_Input = new BufferedReader(new
InputStreamReader(inputStream));
                m_End = Long.MAX_VALUE;
            }
            else {
                m_Input = new BufferedReader(new InputStreamReader(fileIn));
            }
            m_Current = m_Start;
            if (m_Key == null) {
                m_Key = new Text();
            }
            m_Key.set(split.getPath().getName());
            if (m_Value == null) {
                m_Value = new Text();
            }

        }

        /**
         * look for a <scan tag then read until it closes
         *
         * @return true if there is data
         * @throws java.io.IOException
         */
        public boolean nextKeyValue() throws IOException {
            if(readFromCurrentBuffer())
                return true;
            int newSize = 0;
            String startTag = getStartTag() + " ";
            String startTag2 = getStartTag() + ">";
            newSize = m_Input.read(m_Buffer);

            while (newSize > 0) {
                m_Current += newSize;
                m_Sb.append(m_Buffer, 0, newSize);
                if( readFromCurrentBuffer())
                    return true;
                newSize = m_Input.read(m_Buffer);
            }
            // exit because we are at the m_End
            if (newSize <= 0) {
                m_Key = null;
                m_Value = null;
                return false;
            }

            return true;
        }

        protected boolean readFromCurrentBuffer()
        {
            String endTag = getEndTag();
              String startText = m_Sb.toString();
            if(!startText.contains(endTag))
                return false; // need more read
            String startTag = getStartTag() + " ";
             String startTag2 = getStartTag() + ">";
            int index = startText.indexOf(startTag);
            if (index == -1)
                index = startText.indexOf(startTag2);
            if(index == -1)
                return false;
            startText = startText.substring(index);
            m_Sb.setLength(0);
            m_Sb.append(startText);

            String s = startText;
            index = s.indexOf(endTag);
            if (index == -1)
                return false; // need more read
               // throw new IllegalStateException("unmatched tag " +
getBaseTag());
            index += endTag.length();
            String tag = s.substring(0, index).trim();
            m_Value.set(tag);

            // keep the remaining text to add to the next tag
            m_Sb.setLength(0);
            String rest = s.substring(index);
            m_Sb.append(rest);
            return true;
        }

        @Override
        public Text getCurrentKey() {
            return m_Key;
        }

        @Override
        public Text getCurrentValue() {
            return m_Value;
        }

        /**
         * Get the progress within the split
         */
        public float getProgress() {
            return ((float) m_Current - m_Start) / (m_Start - m_End);
        }

        public synchronized void close() throws IOException {
            if (m_Input != null) {
                m_Input.close();
            }
        }
    }
}

=============================================

On Mon, Jul 11, 2011 at 11:57 AM, Erik T <er...@gmail.com> wrote:

> Hello everyone,
>
> I'm new to Hadoop and I'm trying to figure out how to design a M/R program
> to parse a file and generate a PMML file as output.
>
> What I would like to do is split a file by a keyword instead a given number
> of lines because the location of the split could change from time to time.
>
> I'm looking around and was thinking maybe KeyValueTextInputFormat would be
> the way to go but I'm not finding any clear examples how to use it. So I'm
> not sure if this is the right choice or not.
>
> Here is a basic input example of what I'm working with.
>
> [Input file info]
> more info
> more info
> etc.
> etc.
> *Keyword*
> different info
> different info
> *Keyword*
> some more info
>
> For the example above, each section can be generated separately from each
> other. However, within each section, different lines are dependent upon each
> other to generate a valid PMML file.
>
> Can anyone offer a suggestion what type of input format I should use?
>
> Thanks for your time
> Erik
>



-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com