You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com on 2013/09/18 17:19:58 UTC

Pig with CombinedFileInputFormat & CombineFileRecordReader (Not working Pig 0.8)

I am facing a issue of large number of small sized files (51MB each). A
typical M/R job in my working environment would take ateast 2000 files,
which is resulting in 2000 map tasks. Hence i thought of using
CombineFileInputFormat to reduce the problem. I had written a custom
implementation of CombineFileInputFormat along
with CombineFileRecordReader. Below is skeleton of the code.


public class CustomInputFormat extends CombineFileInputFormat<IntWritable,
RecordHolder> {
@Override
public RecordReader<IntWritable, RecordHolder> createRecordReader(final
InputSplit split,
 final TaskAttemptContext taskContext) throws IOException {
final CombineFileSplit fSplit = (CombineFileSplit) split;
 return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
taskContext,
(Class) CustomRecordReader.class);
 }

public static class CustomRecordReader extends RecordReader<IntWritable,
RecordHolder> {
                private IntWritable key = null;
private RecordHolder value = null;
public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
context, Integer idx)
 throws IOException {
FileSplit fileSplit = new FileSplit(split.getPath(idx),
split.getOffset(idx), split.getLength(idx),
 split.getLocations());
Path path = fileSplit.getPath();
}

@Override
public void close() throws IOException {
}

@Override
public IntWritable getCurrentKey() throws IOException, InterruptedException
{
 return key;
}

@Override
public RecordHolder getCurrentValue() throws IOException,
InterruptedException {
 return value;
}

@Override
 public float getProgress() throws IOException, InterruptedException {
return 0;
}

@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
IOException, InterruptedException {

}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
 boolean dataPresent = false;
if (currentIterator.hasNext()) {
Object nextRecord = currentIterator.next();
 key = new IntWritable(recordPosition++);
value = new RecordHolder(nextObject);
dataPresent = true;
 } else {
LOGGER.info("Reached end of File:" + dataPresent);
}
 return dataPresent;
}
}
}

Within setLocation() of Loader i specified the following configurations
job.getConfiguration().set("mapred.max.split.size", "205847916");
job.getConfiguration().set("mapreduce.job.max.split.locations", "5");

I used above inputform in my custom Pig Loader. I ran a pig script to load
data using custom loader and dump it.
Input Size: 1256 Files * 51MB each.

I was expecting atleast 3 splits that would result in 3 map tasks. However
only one map task was started and it was responsible for reading all of
data via the record reader.

Could anyone please show me what am i missing ?

I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not have
the liberty to upgrade Pig version.

I observed these conversations
http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders
http://www.mail-archive.com/user@pig.apache.org/msg03938.html

They did not seem to solve my issue.

Thanks for suggestions.

Regards,
Deepak



-- 
Deepak

Re: Pig with CombinedFileInputFormat & CombineFileRecordReader (Not working Pig 0.8)

Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com.
Yeah. Thanks.
I did observe that Pig combined inputs.


On Tue, Sep 24, 2013 at 11:37 AM, Dmitriy Ryaboy <dv...@gmail.com> wrote:

> Don't use CombinedFile InputFormat / Record Reader. Just let Pig do its
> thing.
>
>
> On Wed, Sep 18, 2013 at 9:08 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
> > I tried this
> >
> http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files
> >
> > Test Job  Details
> > Input 7 Files * 51MB each
> >
> > HDFS Counters of Job
> > Counter Map Reduce Total
> > FileSystemCounters FILE_BYTES_READ 92 23 115
> > HDFS_BYTES_READ 360,235,092 0 360,235,092
> > FILE_BYTES_WRITTEN 116,558 116,349 232,907
> > HDFS_BYTES_WRITTEN 0 9 9
> >
> > From Job Conf
> > pig.maxCombinedSplitSize 205847916
> > pig.splitCombination true
> > set mapred.max.split.size 205847916 (With/Without)
> >
> > Map Task displays these logs from CombileSlpit
> > Total Split Length:360233853
> > Total Split Paths Length:7
> >
> > With 360MB of input data, and above conf there should have been two
> splits.
> > However there was only one split and all 7 files were read from single
> map
> > task
> >
> >
> > From my loader prepareRecodReader does not use the PigSplit
> >
> >        @Override
> > public void prepareToRead(RecordReader arg0, PigSplit arg1) throws
> > IOException {
> >  reader = (CombineFileRecordReader) (arg0);
> > }
> >
> > Any suggestions ?
> >
> >
> > On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> > wrote:
> >
> > > I am facing a issue of large number of small sized files (51MB each). A
> > > typical M/R job in my working environment would take ateast 2000 files,
> > > which is resulting in 2000 map tasks. Hence i thought of using
> > > CombineFileInputFormat to reduce the problem. I had written a custom
> > > implementation of CombineFileInputFormat along
> > > with CombineFileRecordReader. Below is skeleton of the code.
> > >
> > >
> > > public class CustomInputFormat extends
> > CombineFileInputFormat<IntWritable,
> > > RecordHolder> {
> > > @Override
> > > public RecordReader<IntWritable, RecordHolder> createRecordReader(final
> > > InputSplit split,
> > >  final TaskAttemptContext taskContext) throws IOException {
> > > final CombineFileSplit fSplit = (CombineFileSplit) split;
> > >  return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
> > > taskContext,
> > > (Class) CustomRecordReader.class);
> > >  }
> > >
> > > public static class CustomRecordReader extends
> RecordReader<IntWritable,
> > > RecordHolder> {
> > >                 private IntWritable key = null;
> > > private RecordHolder value = null;
> > > public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
> > > context, Integer idx)
> > >  throws IOException {
> > > FileSplit fileSplit = new FileSplit(split.getPath(idx),
> > > split.getOffset(idx), split.getLength(idx),
> > >  split.getLocations());
> > > Path path = fileSplit.getPath();
> > > }
> > >
> > > @Override
> > > public void close() throws IOException {
> > > }
> > >
> > > @Override
> > > public IntWritable getCurrentKey() throws IOException,
> > > InterruptedException {
> > >  return key;
> > > }
> > >
> > > @Override
> > > public RecordHolder getCurrentValue() throws IOException,
> > > InterruptedException {
> > >  return value;
> > > }
> > >
> > > @Override
> > >  public float getProgress() throws IOException, InterruptedException {
> > > return 0;
> > > }
> > >
> > > @Override
> > > public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
> > > IOException, InterruptedException {
> > >
> > > }
> > >
> > > @Override
> > > public boolean nextKeyValue() throws IOException, InterruptedException
> {
> > >  boolean dataPresent = false;
> > > if (currentIterator.hasNext()) {
> > > Object nextRecord = currentIterator.next();
> > >  key = new IntWritable(recordPosition++);
> > > value = new RecordHolder(nextObject);
> > > dataPresent = true;
> > >  } else {
> > > LOGGER.info("Reached end of File:" + dataPresent);
> > > }
> > >  return dataPresent;
> > > }
> > > }
> > > }
> > >
> > > Within setLocation() of Loader i specified the following configurations
> > > job.getConfiguration().set("mapred.max.split.size", "205847916");
> > > job.getConfiguration().set("mapreduce.job.max.split.locations", "5");
> > >
> > > I used above inputform in my custom Pig Loader. I ran a pig script to
> > load
> > > data using custom loader and dump it.
> > > Input Size: 1256 Files * 51MB each.
> > >
> > > I was expecting atleast 3 splits that would result in 3 map tasks.
> > However
> > > only one map task was started and it was responsible for reading all of
> > > data via the record reader.
> > >
> > > Could anyone please show me what am i missing ?
> > >
> > > I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not
> have
> > > the liberty to upgrade Pig version.
> > >
> > > I observed these conversations
> > >
> > >
> >
> http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders
> > > http://www.mail-archive.com/user@pig.apache.org/msg03938.html
> > >
> > > They did not seem to solve my issue.
> > >
> > > Thanks for suggestions.
> > >
> > > Regards,
> > > Deepak
> > >
> > >
> > >
> > > --
> > > Deepak
> > >
> > >
> >
> >
> > --
> > Deepak
> >
>



-- 
Deepak

Re: Pig with CombinedFileInputFormat & CombineFileRecordReader (Not working Pig 0.8)

Posted by Dmitriy Ryaboy <dv...@gmail.com>.
Don't use CombinedFile InputFormat / Record Reader. Just let Pig do its
thing.


On Wed, Sep 18, 2013 at 9:08 AM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:

> I tried this
> http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files
>
> Test Job  Details
> Input 7 Files * 51MB each
>
> HDFS Counters of Job
> Counter Map Reduce Total
> FileSystemCounters FILE_BYTES_READ 92 23 115
> HDFS_BYTES_READ 360,235,092 0 360,235,092
> FILE_BYTES_WRITTEN 116,558 116,349 232,907
> HDFS_BYTES_WRITTEN 0 9 9
>
> From Job Conf
> pig.maxCombinedSplitSize 205847916
> pig.splitCombination true
> set mapred.max.split.size 205847916 (With/Without)
>
> Map Task displays these logs from CombileSlpit
> Total Split Length:360233853
> Total Split Paths Length:7
>
> With 360MB of input data, and above conf there should have been two splits.
> However there was only one split and all 7 files were read from single map
> task
>
>
> From my loader prepareRecodReader does not use the PigSplit
>
>        @Override
> public void prepareToRead(RecordReader arg0, PigSplit arg1) throws
> IOException {
>  reader = (CombineFileRecordReader) (arg0);
> }
>
> Any suggestions ?
>
>
> On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>
> wrote:
>
> > I am facing a issue of large number of small sized files (51MB each). A
> > typical M/R job in my working environment would take ateast 2000 files,
> > which is resulting in 2000 map tasks. Hence i thought of using
> > CombineFileInputFormat to reduce the problem. I had written a custom
> > implementation of CombineFileInputFormat along
> > with CombineFileRecordReader. Below is skeleton of the code.
> >
> >
> > public class CustomInputFormat extends
> CombineFileInputFormat<IntWritable,
> > RecordHolder> {
> > @Override
> > public RecordReader<IntWritable, RecordHolder> createRecordReader(final
> > InputSplit split,
> >  final TaskAttemptContext taskContext) throws IOException {
> > final CombineFileSplit fSplit = (CombineFileSplit) split;
> >  return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
> > taskContext,
> > (Class) CustomRecordReader.class);
> >  }
> >
> > public static class CustomRecordReader extends RecordReader<IntWritable,
> > RecordHolder> {
> >                 private IntWritable key = null;
> > private RecordHolder value = null;
> > public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
> > context, Integer idx)
> >  throws IOException {
> > FileSplit fileSplit = new FileSplit(split.getPath(idx),
> > split.getOffset(idx), split.getLength(idx),
> >  split.getLocations());
> > Path path = fileSplit.getPath();
> > }
> >
> > @Override
> > public void close() throws IOException {
> > }
> >
> > @Override
> > public IntWritable getCurrentKey() throws IOException,
> > InterruptedException {
> >  return key;
> > }
> >
> > @Override
> > public RecordHolder getCurrentValue() throws IOException,
> > InterruptedException {
> >  return value;
> > }
> >
> > @Override
> >  public float getProgress() throws IOException, InterruptedException {
> > return 0;
> > }
> >
> > @Override
> > public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
> > IOException, InterruptedException {
> >
> > }
> >
> > @Override
> > public boolean nextKeyValue() throws IOException, InterruptedException {
> >  boolean dataPresent = false;
> > if (currentIterator.hasNext()) {
> > Object nextRecord = currentIterator.next();
> >  key = new IntWritable(recordPosition++);
> > value = new RecordHolder(nextObject);
> > dataPresent = true;
> >  } else {
> > LOGGER.info("Reached end of File:" + dataPresent);
> > }
> >  return dataPresent;
> > }
> > }
> > }
> >
> > Within setLocation() of Loader i specified the following configurations
> > job.getConfiguration().set("mapred.max.split.size", "205847916");
> > job.getConfiguration().set("mapreduce.job.max.split.locations", "5");
> >
> > I used above inputform in my custom Pig Loader. I ran a pig script to
> load
> > data using custom loader and dump it.
> > Input Size: 1256 Files * 51MB each.
> >
> > I was expecting atleast 3 splits that would result in 3 map tasks.
> However
> > only one map task was started and it was responsible for reading all of
> > data via the record reader.
> >
> > Could anyone please show me what am i missing ?
> >
> > I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not have
> > the liberty to upgrade Pig version.
> >
> > I observed these conversations
> >
> >
> http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders
> > http://www.mail-archive.com/user@pig.apache.org/msg03938.html
> >
> > They did not seem to solve my issue.
> >
> > Thanks for suggestions.
> >
> > Regards,
> > Deepak
> >
> >
> >
> > --
> > Deepak
> >
> >
>
>
> --
> Deepak
>

Re: Pig with CombinedFileInputFormat & CombineFileRecordReader (Not working Pig 0.8)

Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com.
I tried this
http://pig.apache.org/docs/r0.8.1/cookbook.html#Combine+Small+Input+Files

Test Job  Details
Input 7 Files * 51MB each

HDFS Counters of Job
Counter Map Reduce Total
FileSystemCounters FILE_BYTES_READ 92 23 115
HDFS_BYTES_READ 360,235,092 0 360,235,092
FILE_BYTES_WRITTEN 116,558 116,349 232,907
HDFS_BYTES_WRITTEN 0 9 9

>From Job Conf
pig.maxCombinedSplitSize 205847916
pig.splitCombination true
set mapred.max.split.size 205847916 (With/Without)

Map Task displays these logs from CombileSlpit
Total Split Length:360233853
Total Split Paths Length:7

With 360MB of input data, and above conf there should have been two splits.
However there was only one split and all 7 files were read from single map
task


>From my loader prepareRecodReader does not use the PigSplit

       @Override
public void prepareToRead(RecordReader arg0, PigSplit arg1) throws
IOException {
 reader = (CombineFileRecordReader) (arg0);
}

Any suggestions ?


On Wed, Sep 18, 2013 at 8:49 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com> wrote:

> I am facing a issue of large number of small sized files (51MB each). A
> typical M/R job in my working environment would take ateast 2000 files,
> which is resulting in 2000 map tasks. Hence i thought of using
> CombineFileInputFormat to reduce the problem. I had written a custom
> implementation of CombineFileInputFormat along
> with CombineFileRecordReader. Below is skeleton of the code.
>
>
> public class CustomInputFormat extends CombineFileInputFormat<IntWritable,
> RecordHolder> {
> @Override
> public RecordReader<IntWritable, RecordHolder> createRecordReader(final
> InputSplit split,
>  final TaskAttemptContext taskContext) throws IOException {
> final CombineFileSplit fSplit = (CombineFileSplit) split;
>  return new CombineFileRecordReader<IntWritable, RecordHolder>(fSplit,
> taskContext,
> (Class) CustomRecordReader.class);
>  }
>
> public static class CustomRecordReader extends RecordReader<IntWritable,
> RecordHolder> {
>                 private IntWritable key = null;
> private RecordHolder value = null;
> public CustomRecordReader (CombineFileSplit split, TaskAttemptContext
> context, Integer idx)
>  throws IOException {
> FileSplit fileSplit = new FileSplit(split.getPath(idx),
> split.getOffset(idx), split.getLength(idx),
>  split.getLocations());
> Path path = fileSplit.getPath();
> }
>
> @Override
> public void close() throws IOException {
> }
>
> @Override
> public IntWritable getCurrentKey() throws IOException,
> InterruptedException {
>  return key;
> }
>
> @Override
> public RecordHolder getCurrentValue() throws IOException,
> InterruptedException {
>  return value;
> }
>
> @Override
>  public float getProgress() throws IOException, InterruptedException {
> return 0;
> }
>
> @Override
> public void initialize(InputSplit arg0, TaskAttemptContext arg1) throws
> IOException, InterruptedException {
>
> }
>
> @Override
> public boolean nextKeyValue() throws IOException, InterruptedException {
>  boolean dataPresent = false;
> if (currentIterator.hasNext()) {
> Object nextRecord = currentIterator.next();
>  key = new IntWritable(recordPosition++);
> value = new RecordHolder(nextObject);
> dataPresent = true;
>  } else {
> LOGGER.info("Reached end of File:" + dataPresent);
> }
>  return dataPresent;
> }
> }
> }
>
> Within setLocation() of Loader i specified the following configurations
> job.getConfiguration().set("mapred.max.split.size", "205847916");
> job.getConfiguration().set("mapreduce.job.max.split.locations", "5");
>
> I used above inputform in my custom Pig Loader. I ran a pig script to load
> data using custom loader and dump it.
> Input Size: 1256 Files * 51MB each.
>
> I was expecting atleast 3 splits that would result in 3 map tasks. However
> only one map task was started and it was responsible for reading all of
> data via the record reader.
>
> Could anyone please show me what am i missing ?
>
> I am using Pig 0.8 v and not Pig 0.11 as mentioned earlier. I do not have
> the liberty to upgrade Pig version.
>
> I observed these conversations
>
> http://grokbase.com/t/pig/dev/107trhy9wd/jira-created-pig-1518-multi-file-input-format-for-loaders
> http://www.mail-archive.com/user@pig.apache.org/msg03938.html
>
> They did not seem to solve my issue.
>
> Thanks for suggestions.
>
> Regards,
> Deepak
>
>
>
> --
> Deepak
>
>


-- 
Deepak