You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@pig.apache.org by Andreas Paepcke <pa...@cs.stanford.edu> on 2011/02/16 01:48:36 UTC

Cannot Properly Guide Splits to Nodes

Hi,

The following problem may have one of three root causes. Can someone
tell which interpretation is correct, and what I should do about the
problem?

Summary :
    Either : Controlling split distribution across nodes is impossible,
                because PigInputFormat's getSplits() method mixes up two
                configuration instances,
          or : Maybe PigInputFormat is designed too file-centrically,
          or : I am misusing Pig

Detail:
I am implementing a loader that retrieves historic Web pages from a
remote Web archive. Each split is assigned a range of pages, which the
corresponding mapper independently retrieves from the archive via
sockets. HDFS is not involved until possibly later.

I subclass InputFormat so as to compute a well balanced split list
via:

     public class WbInputFormat extends InputFormat<WbInputSplit,Text> {
           ...
     }

The number of Web pages each split is responsible for is about equal for
each split. That number is computed with the intent that each physical
compute node receives one split. That is I divide the total number of
pages to be retrieved by the number of compute nodes.

All goes according to plan, until after PigInputFormat's

    public List<InputSplit> getSplits(JobContext jobcontext)

method calls my input format's getSplits(), and then calls
PigInputFormat's getPigSplits() method with the result.
(Comments are mine):

         // Call my getSplits() method:
    List<InputSplit> oneInputSplits = inpFormat.getSplits(
            new JobContext(inputSpecificJob.getConfiguration(),
                    jobcontext.getJobID()));
        // Place the splits across nodes:
    List<InputSplit> oneInputPigSplits = getPigSplits(
            oneInputSplits, i, inpTargets.get(i), fs.getDefaultBlockSize(),
combinable, confClone);

The getPigSplits() method assigns splits to nodes driven by a constant:

 long maxCombinedSplitSize = conf.getLong("pig.maxCombinedSplitSize", 0);

If the property pig.maxCombinedSplitSize is unavailable in conf, then
maxCombinedSplitSize is set to the HDFS default file block size, which
is of course unrelated to any realities in my scenario.

For a remedy I add the above property in my WbInputFormat's
getSplits() method, setting the value to the largest of my
splits.

        public List<InputSplit> getSplits(JobContext context)
             ...
              context.getConfiguration().setLong("pig.maxCombinedSplitSize",
largestSplit);

This strategy would likely work.

BUT PigInputFormat's getSplits() uses one configuration instance when
calling my getSplits() (to which I add the required property), and
then subsequently passes a *different* conf to getPigSplits().

This switch happens like this in PigInputFormat's getSplits():

   1 A split-specific Job is created like this:

       Configuration confClone = new Configuration(conf);
       Job inputSpecificJob = new Job(confClone);

     Note that during instantiation of inputSpecificJob the conf
     passed into the constructor is cloned again (far up the class
     hierarchy).

   2 My (WbInputFormat's) getSplits() is called like this:

       List<InputSplit> oneInputSplits = inpFormat.getSplits(
                new JobContext(inputSpecificJob.getConfiguration(),
                        jobcontext.getJobID()));

   3 But now getPigSplits() is called like this:

       List<InputSplit> oneInputPigSplits = getPigSplits(
          oneInputSplits, i, inpTargets.get(i), fs.getDefaultBlockSize(),
combinable, confClone);

The confClone property is not the one that has
"pig.maxCombinedSplitSize" set, and therefore getPigSplits() will use
the HDFS block size as guide to distributing splits across nodes.

What to do?

Thanks!

Andreas