You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/08/26 19:54:26 UTC

svn commit: r989849 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/util/ src/org/apache/pig/impl/io/ test/org/apache/pig/test/

Author: rding
Date: Thu Aug 26 17:54:26 2010
New Revision: 989849

URL: http://svn.apache.org/viewvc?rev=989849&view=rev
Log:
PIG-1518: multi file input format for loaders

Added:
    hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 26 17:54:26 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
 
 IMPROVEMENTS
 
+PIG-1518: multi file input format for loaders (yanz via rding)
+
 PIG-1501: need to investigate the impact of compression on pig performance (yanz via thejas)
 
 PIG-1497: Mandatory rule PartitionFilterOptimizer (xuefuz via daijy)

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Aug 26 17:54:26 2010
@@ -636,6 +636,19 @@ public class JobControlCompiler{
 
             conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
             conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
+            String tmp;
+            long maxCombinedSplitSize = 0;
+            if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
+                conf.setBoolean("pig.noSplitCombination", true);
+            else if ((tmp = pigContext.getProperties().getProperty("pig.maxCombinedSplitSize", null)) != null) {
+                try {
+                    maxCombinedSplitSize = Long.parseLong(tmp);
+                } catch (NumberFormatException e) {
+                    log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+                }
+            }
+            if (maxCombinedSplitSize > 0)
+                conf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
                         
             // Serialize the UDF specific context info.
             UDFContext.getUDFContext().serialize(conf);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Aug 26 17:54:26 2010
@@ -1572,6 +1572,9 @@ public class MRCompiler extends PhyPlanV
                 MRPlan.connect(rightMROpr, curMROp);
             }
             phyToMROpMap.put(joinOp, curMROp);
+            // no combination of small splits as there is currently no way to guarantee the sortness
+            // of the combined splits.
+            curMROp.noCombineSmallSplits();
         }
         catch(PlanException e){
             int errCode = 2034;

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Aug 26 17:54:26 2010
@@ -141,6 +141,11 @@ public class MapReduceOper extends Opera
 	// Set to true in indexing job generated in map-side cogroup, merge join.
 	private boolean usingTypedComparator = false;
 	
+	// Flag to indicate if the small input splits need to be combined to form a larger
+	// one in order to reduce the number of mappers. For merge join, both tables
+	// are NOT combinable for correctness.
+	private boolean combineSmallSplits = true;
+	
 	private static enum OPER_FEATURE {
 	    NONE,
 	    // Indicate if this job is a sampling job
@@ -458,4 +463,12 @@ public class MapReduceOper extends Opera
     protected void useTypedComparator(boolean useTypedComparator) {
         this.usingTypedComparator = useTypedComparator;
     }
+    
+    protected void noCombineSmallSplits() {
+        combineSmallSplits = false;
+    }
+    
+    public boolean combineSmallSplits() {
+        return combineSmallSplits;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Aug 26 17:54:26 2010
@@ -19,7 +19,11 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Comparator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,8 +40,11 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.ExecType;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
+import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -107,11 +114,8 @@ public class PigInputFormat extends Inpu
         PigInputFormat.sJob = conf;
         
         InputFormat inputFormat = loadFunc.getInputFormat();
-        // now invoke the createRecordReader() with this "adjusted" conf
-        RecordReader reader = inputFormat.createRecordReader(
-                pigSplit.getWrappedSplit(), context);
         
-        return new PigRecordReader(reader, loadFunc, conf);
+        return new PigRecordReader(inputFormat, pigSplit, loadFunc, context);
     }
     
 
@@ -243,6 +247,12 @@ public class PigInputFormat extends Inpu
                 FuncSpec loadFuncSpec = inputs.get(i).getFuncSpec();
                 LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(
                         loadFuncSpec);
+                boolean combinable = !(loadFunc instanceof MergeJoinIndexer) &&
+                !(IndexableLoadFunc.class.isAssignableFrom(loadFunc.getClass())) &&
+                !(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()) &&
+                    OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()));
+                if (combinable)
+                    combinable = !conf.getBoolean("pig.noSplitCombination", false);
                 Configuration confClone = new Configuration(conf);
                 Job inputSpecificJob = new Job(confClone);
                 // Pass loader signature to LoadFunc and to InputFormat through
@@ -258,8 +268,8 @@ public class PigInputFormat extends Inpu
                 List<InputSplit> oneInputSplits = inpFormat.getSplits(
                         new JobContext(inputSpecificJob.getConfiguration(), 
                                 jobcontext.getJobID()));
-                List<PigSplit> oneInputPigSplits = getPigSplits(
-                        oneInputSplits, i, inpTargets.get(i), conf);
+                List<InputSplit> oneInputPigSplits = getPigSplits(
+                        oneInputSplits, i, inpTargets.get(i), fs.getDefaultBlockSize(), combinable, confClone);
                 splits.addAll(oneInputPigSplits);
             } catch (ExecException ee) {
                 throw ee;
@@ -285,19 +295,40 @@ public class PigInputFormat extends Inpu
         return splits;
     }
 
-    private List<PigSplit> getPigSplits(List<InputSplit> oneInputSplits, 
-            int inputIndex, ArrayList<OperatorKey> targetOps, Configuration conf) {
-        int splitIndex = 0;
-        ArrayList<PigSplit> pigSplits = new ArrayList<PigSplit>();
-        for (InputSplit inputSplit : oneInputSplits) {
-            PigSplit pigSplit = new PigSplit(inputSplit, inputIndex, targetOps,
-                    splitIndex++);
-            pigSplit.setConf(conf);
-            pigSplits.add(pigSplit);
+    protected List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits, 
+            int inputIndex, ArrayList<OperatorKey> targetOps, long blockSize, boolean combinable, Configuration conf)
+            throws IOException, InterruptedException {
+        ArrayList<InputSplit> pigSplits = new ArrayList<InputSplit>();
+        if (!combinable) {
+            int splitIndex = 0;
+            for (InputSplit inputSplit : oneInputSplits) {
+                PigSplit pigSplit = new PigSplit(new InputSplit[] {inputSplit}, inputIndex, targetOps,
+                        splitIndex++);
+                pigSplit.setConf(conf);
+                pigSplits.add(pigSplit);
+            }
+            return pigSplits;
+        } else {
+            long maxCombinedSplitSize = conf.getLong("pig.maxCombinedSplitSize", 0);
+            if (maxCombinedSplitSize== 0)
+                // default is the block size
+                maxCombinedSplitSize = blockSize;
+            List<List<InputSplit>> combinedSplits = 
+                MapRedUtil.getCombinePigSplits(oneInputSplits, maxCombinedSplitSize, conf);
+            for (int i = 0; i < combinedSplits.size(); i++)
+                pigSplits.add(createPigSplit(combinedSplits.get(i), inputIndex, targetOps, i, conf));
+            return pigSplits;
         }
-        return pigSplits;
     }
 
+    private InputSplit createPigSplit(List<InputSplit> combinedSplits,
+        int inputIndex, ArrayList<OperatorKey> targetOps, int splitIndex, Configuration conf)
+    {
+        PigSplit pigSplit = new PigSplit(combinedSplits.toArray(new InputSplit[0]), inputIndex, targetOps, splitIndex);
+        pigSplit.setConf(conf);
+        return pigSplit;
+    }
+    
     public static PigSplit getActiveSplit() {
         return activeSplit;
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Thu Aug 26 17:54:26 2010
@@ -58,9 +58,9 @@ public class PigRecordReader extends Rec
      */
     Tuple curValue = null;
     
-    // the underlying RecordReader used by the loader
+    // the current wrapped RecordReader used by the loader
     @SuppressWarnings("unchecked")
-    private RecordReader wrappedReader;
+    private RecordReader curReader;
     
     // the loader object
     private LoadFunc loadfunc;
@@ -71,6 +71,19 @@ public class PigRecordReader extends Rec
     // the Hadoop counter name
     transient private String counterName = null;
     
+    // the wrapped inputformat
+    private InputFormat inputformat;
+    
+    // the wrapped splits
+    private PigSplit pigSplit;
+    
+    // the wrapped split index in use
+    private int idx;
+    
+    private long progress;
+    
+    private TaskAttemptContext context;
+    
     /**
      * the Configuration object with data specific to the input the underlying
      * RecordReader will process (this is obtained after a 
@@ -80,19 +93,28 @@ public class PigRecordReader extends Rec
      */
     private Configuration inputSpecificConf;
     /**
-     * @param conf 
+     * @param context 
      * 
      */
-    public PigRecordReader(RecordReader wrappedReader, 
-            LoadFunc loadFunc, Configuration conf) {
-        this.wrappedReader = wrappedReader; 
+    public PigRecordReader(InputFormat inputformat, PigSplit pigSplit, 
+            LoadFunc loadFunc, TaskAttemptContext context) throws IOException, InterruptedException {
+        this.inputformat = inputformat;
+        this.pigSplit = pigSplit; 
         this.loadfunc = loadFunc;
-        this.inputSpecificConf = conf;
+        this.context = context;
+        this.inputSpecificConf = context.getConfiguration();
+        curReader = null;
+        progress = 0;
+        idx = 0;
+        initNextRecordReader();
     }
     
     @Override
     public void close() throws IOException {
-        wrappedReader.close();        
+        if (curReader != null) {
+            curReader.close();
+            curReader = null;
+        }
     }
 
     @Override
@@ -125,7 +147,12 @@ public class PigRecordReader extends Rec
 
     @Override
     public float getProgress() throws IOException, InterruptedException {
-        return wrappedReader.getProgress();
+        long subprogress = 0;    // bytes processed in current split
+        if (null != curReader) {
+            // idx is always one past the current subsplit's true index.
+            subprogress = (long)(curReader.getProgress() * pigSplit.getLength(idx - 1));
+        }
+        return Math.min(1.0f,  (progress + subprogress)/(float)(pigSplit.getLength()));
     }
 
     @Override
@@ -135,7 +162,8 @@ public class PigRecordReader extends Rec
         // object - this is achieved by merging the Context corresponding to 
         // the input split this Reader is supposed to process with the context
         // passed in.
-        PigSplit pigSplit = (PigSplit)split;
+        this.pigSplit = (PigSplit)split;
+        this.context = context;
         ConfigurationUtil.mergeConf(context.getConfiguration(),
                 inputSpecificConf);
         // Pass loader signature to LoadFunc and to InputFormat through
@@ -144,8 +172,10 @@ public class PigRecordReader extends Rec
                 context.getConfiguration());
         // now invoke initialize() on underlying RecordReader with
         // the "adjusted" conf
-        wrappedReader.initialize(pigSplit.getWrappedSplit(), context);
-        loadfunc.prepareToRead(wrappedReader, pigSplit);
+        if (null != curReader) {
+            curReader.initialize(pigSplit.getWrappedSplit(), context);
+            loadfunc.prepareToRead(curReader, pigSplit);
+        }
                 
         if (pigSplit.isMultiInputs()) { 
             counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
@@ -154,8 +184,12 @@ public class PigRecordReader extends Rec
 
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
-        curValue = loadfunc.getNext();
-        return curValue != null;
+        while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
+            if (!initNextRecordReader()) {
+              return false;
+            }
+        }
+        return true;
     }
 
     @SuppressWarnings("unchecked")
@@ -167,4 +201,41 @@ public class PigRecordReader extends Rec
         String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
         return PigStatsUtil.getMultiInputsCounterName(fname);
     }
+    
+    /**
+     * Get the record reader for the next chunk in this CombineFileSplit.
+     */
+    protected boolean initNextRecordReader() throws IOException, InterruptedException {
+
+        if (curReader != null) {
+            curReader.close();
+            curReader = null;
+            if (idx > 0) {
+                progress += pigSplit.getLength(idx-1);    // done processing so far
+            }
+        }
+
+        // if all chunks have been processed, nothing more to do.
+        if (idx == pigSplit.getNumPaths()) {
+            return false;
+        }
+
+        // get a record reader for the idx-th chunk
+        try {
+          
+
+            curReader =  inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
+
+            if (idx > 0) {
+                // initialize() for the first RecordReader will be called by MapTask;
+                // we're responsible for initializing subsequent RecordReaders.
+                curReader.initialize(pigSplit.getWrappedSplit(idx), context);
+                loadfunc.prepareToRead(curReader, pigSplit);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException (e);
+        }
+        idx++;
+        return true;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Thu Aug 26 17:54:26 2010
@@ -29,6 +29,8 @@ import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.HashSet;
+import java.lang.StringBuilder;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
@@ -65,7 +67,7 @@ public class PigSplit extends InputSplit
     private int inputIndex;
     
     // The real InputSplit this split is wrapping
-    private InputSplit wrappedSplit;
+    private InputSplit[] wrappedSplits;
 
     // index of the wrappedSplit in the list of splits returned by
     // InputFormat.getSplits()
@@ -87,20 +89,29 @@ public class PigSplit extends InputSplit
      * total number of splits - required by skew join
      */
     private int totalSplits;
+    
+    /**
+     * total length
+     */
+    private long length = -1;
+    
+    /**
+     * overall locations
+     */
+    String[] locations = null;
 
     // this seems necessary for Hadoop to instatiate this split on the
     // backend
     public PigSplit() {}
     
-    public PigSplit(InputSplit wrappedSplit, int inputIndex, 
+    public PigSplit(InputSplit[] wrappedSplits, int inputIndex, 
             List<OperatorKey> targetOps, int splitIndex) {
-        this.wrappedSplit = wrappedSplit;
+        this.wrappedSplits = wrappedSplits;
         this.inputIndex = inputIndex;
         this.targetOps = new ArrayList<OperatorKey>(targetOps);
         this.splitIndex = splitIndex;
     }
-
-
+    
     public List<OperatorKey> getTargetOps() {
         return new ArrayList<OperatorKey>(targetOps);
     }
@@ -112,17 +123,53 @@ public class PigSplit extends InputSplit
      * @return the wrappedSplit
      */
     public InputSplit getWrappedSplit() {
-        return wrappedSplit;
+        return wrappedSplits[0];
+    }
+    
+    /**
+     * 
+     * @param idx the index into the wrapped splits
+     * @return the specified wrapped split
+     */
+    public InputSplit getWrappedSplit(int idx) {
+        return wrappedSplits[idx];
     }
     
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
-            return wrappedSplit.getLocations();
+        if (locations == null) {
+            HashSet<String> locSet = new HashSet<String>();
+            for (int i = 0; i < wrappedSplits.length; i++)
+            {
+                String[] locs = wrappedSplits[i].getLocations();
+                for (int j = 0; j < locs.length; j++)
+                    locSet.add(locs[j]);
+            }
+            locations = new String[locSet.size()];
+            int i = 0;
+            for (String loc : locSet)
+                locations[i++] = loc;
+        }
+        return locations;
     }
 
     @Override
     public long getLength() throws IOException, InterruptedException {
-        return wrappedSplit.getLength();
+        if (length == -1) {
+            length = 0;
+            for (int i = 0; i < wrappedSplits.length; i++)
+                length += wrappedSplits[i].getLength();
+        }
+        return length;
+    }
+    
+    /**
+     * Return the length of a wrapped split
+     * @param idx the index into the wrapped splits
+     * @return number of wrapped splits
+     */
+    public long getLength(int idx) throws IOException, InterruptedException {
+        return wrappedSplits[idx].getLength();
     }
     
     @SuppressWarnings("unchecked")
@@ -132,15 +179,20 @@ public class PigSplit extends InputSplit
         splitIndex = is.readInt();
         inputIndex = is.readInt();
         targetOps = (ArrayList<OperatorKey>) readObject(is);
+        int splitLen = is.readInt();
         String splitClassName = is.readUTF();
         try {
             Class splitClass = conf.getClassByName(splitClassName);
-            wrappedSplit = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
             SerializationFactory sf = new SerializationFactory(conf);
             // The correct call sequence for Deserializer is, we shall open, then deserialize, but we shall not close
             Deserializer d = sf.getDeserializer(splitClass);
             d.open((InputStream) is);
-            d.deserialize(wrappedSplit);
+            wrappedSplits = new InputSplit[splitLen];
+            for (int i = 0; i < splitLen; i++)
+            {
+                wrappedSplits[i] = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
+                d.deserialize(wrappedSplits[i]);
+            }
         } catch (ClassNotFoundException e) {
             throw new IOException(e);
         }
@@ -154,13 +206,17 @@ public class PigSplit extends InputSplit
         os.writeInt(splitIndex);
         os.writeInt(inputIndex);
         writeObject(targetOps, os);
-        os.writeUTF(wrappedSplit.getClass().getName());
+        os.writeInt(wrappedSplits.length);
+        os.writeUTF(wrappedSplits[0].getClass().getName());
         SerializationFactory sf = new SerializationFactory(conf);
         Serializer s = 
-            sf.getSerializer(wrappedSplit.getClass());
-        // The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close
+            sf.getSerializer(wrappedSplits[0].getClass());
         s.open((OutputStream) os);
-        s.serialize(wrappedSplit);
+        for (int i = 0; i < wrappedSplits.length; i++)
+        {
+            // The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close
+            s.serialize(wrappedSplits[i]);
+        }
         
     }
 
@@ -242,6 +298,14 @@ public class PigSplit extends InputSplit
     int getInputIndex() {
         return inputIndex;
     }
+    
+    /**
+     * 
+     * @return the number of wrapped splits
+     */
+    public int getNumPaths() {
+        return wrappedSplits.length;
+    }
 
     /**
      * @return the totalSplits
@@ -263,4 +327,23 @@ public class PigSplit extends InputSplit
         this.totalSplits = totalSplits;
     }
 
+    @Override
+    public String toString() {
+        StringBuilder st = new StringBuilder();
+        st.append("Number of splits :" + wrappedSplits.length+"\n");
+        try {
+            st.append("Total Length = "+ getLength()+"\n");
+            for (int i = 0; i < wrappedSplits.length; i++) {
+                st.append("Input split["+i+"]:\n   Length = "+ wrappedSplits[i].getLength()+"\n  Locations:\n");
+                for (String location :  wrappedSplits[i].getLocations())
+                    st.append("    "+location+"\n");
+                st.append("\n-----------------------\n"); 
+          }
+        } catch (IOException e) {
+          return null;
+        } catch (InterruptedException e) {
+          return null;
+        }
+        return st.toString();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Thu Aug 26 17:54:26 2010
@@ -21,9 +21,12 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Comparator;
+import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -39,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -222,4 +227,397 @@ public class MapRedUtil {
             return !name.startsWith("_") && !name.startsWith("."); 
         }
     };    
+
+    /* The following codes are for split combination: see PIG-1518
+     * 
+     */
+    private static Comparator<Node> nodeComparator = new Comparator<Node>() {
+        @Override
+        public int compare(Node o1, Node o2) {
+            long cmp = o1.length - o2.length;
+            return cmp == 0 ? 0 : cmp < 0 ? -1 : 1;
+        }
+    };
+    
+    private static final class ComparableSplit implements Comparable<ComparableSplit> {
+        private InputSplit rawInputSplit;
+        private HashSet<Node> nodes;
+        // id used as a tie-breaker when two splits are of equal size.
+        private long id;
+        ComparableSplit(InputSplit split, long id) {
+            rawInputSplit = split;
+            nodes = new HashSet<Node>();
+            this.id = id;
+        }
+        
+        void add(Node node) {
+            nodes.add(node);
+        }
+        
+        void removeFromNodes() {
+            for (Node node : nodes)
+                node.remove(this);
+        }
+        
+        public InputSplit getSplit() {
+            return rawInputSplit;
+        }
+  
+        @Override
+        public boolean equals(Object other) {
+            if (other == null || !(other instanceof ComparableSplit))
+                return false;
+            return (compareTo((ComparableSplit) other) == 0);
+        }
+        
+        @Override
+        public int hashCode() {
+            return 41;
+        }
+        
+        @Override
+        public int compareTo(ComparableSplit other) {
+            try {
+                long cmp = rawInputSplit.getLength() - other.rawInputSplit.getLength();
+                // in descending order
+                return cmp == 0 ? (id == other.id ? 0 : id < other.id ? -1 : 1) : cmp < 0 ?  1 : -1;
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+      
+    private static class DummySplit extends InputSplit {
+        private long length;
+        
+        @Override
+        public String[] getLocations() {
+            return null;
+        }
+        
+        @Override
+        public long getLength() {
+            return length;
+        }
+        
+        public void setLength(long length) {
+            this.length = length;
+        }
+    }
+    
+    private static class Node {
+        private long length = 0;
+        private ArrayList<ComparableSplit> splits;
+        private boolean sorted;
+        
+        Node() throws IOException, InterruptedException {
+            length = 0;
+            splits = new ArrayList<ComparableSplit>();
+            sorted = false;
+        }
+        
+        void add(ComparableSplit split) throws IOException, InterruptedException {
+            splits.add(split);
+            length++;
+        }
+        
+        void remove(ComparableSplit split) {
+            if (!sorted)
+                sort();
+            int index = Collections.binarySearch(splits, split);
+            if (index >= 0) {
+                splits.remove(index);
+                length--;
+            }
+        }
+        
+        void sort() {
+            if (!sorted) {
+                Collections.sort(splits);
+                sorted = true;
+            }
+        }
+        
+        ArrayList<ComparableSplit> getSplits() {
+            return splits;
+        }
+  
+        public long getLength() {
+            return length;
+        }
+    }
+  
+    public static List<List<InputSplit>> getCombinePigSplits(List<InputSplit>
+        oneInputSplits, long maxCombinedSplitSize, Configuration conf)
+          throws IOException, InterruptedException {
+        ArrayList<Node> nodes = new ArrayList<Node>();
+        HashMap<String, Node> nodeMap = new HashMap<String, Node>();
+        List<List<InputSplit>> result = new ArrayList<List<InputSplit>>();
+        List<Long> resultLengths = new ArrayList<Long>();
+        long comparableSplitId = 0;
+        
+        int size = 0, nSplits = oneInputSplits.size();
+        InputSplit lastSplit = null;
+        int emptyCnt = 0;
+        for (InputSplit split : oneInputSplits) {
+            if (split.getLength() == 0) {
+                emptyCnt++; 
+                continue;
+            }
+            if (split.getLength() >= maxCombinedSplitSize) {
+                comparableSplitId++;
+                ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+                combinedSplits.add(split);
+                result.add(combinedSplits);
+                resultLengths.add(split.getLength());
+            } else {
+                ComparableSplit csplit = new ComparableSplit(split, comparableSplitId++);
+                String[] locations = split.getLocations();
+                HashSet<String> locationSeen = new HashSet<String>();
+                for (String location : locations)
+                {
+                    if (!locationSeen.contains(location)) 
+                    {
+                        Node node = nodeMap.get(location);
+                        if (node == null) {
+                            node = new Node();
+                            nodes.add(node);
+                            nodeMap.put(location, node);
+                        }
+                        node.add(csplit);
+                        csplit.add(node);
+                        locationSeen.add(location);
+                    }
+                }
+                lastSplit = split;
+                size++;
+            }
+        }
+        /* verification code: debug purpose
+        {
+          ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>();
+          HashSet<InputSplit> seen = new HashSet<InputSplit>();
+          for (Node node : nodes) {
+            if (node.getLength() > 0)
+            {
+              ArrayList<ComparableSplit> splits = node.getSplits();
+              for (ComparableSplit split : splits) {
+                if (!seen.contains(split.getSplit())) {
+                  // remove duplicates. The set has to be on the raw input split not the 
+                  // comparable input split as the latter overrides the compareTo method
+                  // so its equality semantics is changed and not we want here
+                  seen.add(split.getSplit());
+                  leftoverSplits.add(split);
+                }
+              }
+            }
+          }
+          
+          int combinedSplitLen = 0;
+          for (PigSplit split : result)
+            combinedSplitLen += split.getNumPaths();
+          if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt) {
+            throw new AssertionError("number of combined splits {"+combinedSplitLen+"+"+leftoverSplits.size()+"-"+size+"} does not match the number of original splits ["+nSplits+"].");
+          }
+        }
+        */
+        if (nSplits > 0 && emptyCnt == nSplits)
+        {
+            // if all splits are empty, add a single empty split as currently an empty directory is
+            // not properly handled somewhere
+            ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+            combinedSplits.add(oneInputSplits.get(0));
+            result.add(combinedSplits);
+        }
+        else if (size == 1) {
+            ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+            combinedSplits.add(lastSplit);
+            result.add(combinedSplits);
+        } else if (size > 1) {
+            // combine small splits
+            Collections.sort(nodes, nodeComparator);
+            DummySplit dummy = new DummySplit();
+            // dummy is used to search for next split of suitable size to be combined
+            ComparableSplit dummyComparableSplit = new ComparableSplit(dummy, -1);
+            for (Node node : nodes) {
+                // sort the splits on this node in descending order
+                node.sort();
+                long totalSize = 0;
+                ArrayList<ComparableSplit> splits = node.getSplits();
+                int idx;
+                int lenSplits;
+                ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+                ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
+                while (!splits.isEmpty()) {
+                    combinedSplits.add(splits.get(0).getSplit());
+                    combinedComparableSplits.add(splits.get(0));
+                    int startIdx = 1;
+                    lenSplits = splits.size();
+                    totalSize += splits.get(0).getSplit().getLength();
+                    long spaceLeft = maxCombinedSplitSize - totalSize;
+                    dummy.setLength(spaceLeft);
+                    idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit);
+                    idx = -idx-1+startIdx;
+                    while (idx < lenSplits)
+                    {
+                        long thisLen = splits.get(idx).getSplit().getLength();
+                        combinedSplits.add(splits.get(idx).getSplit());
+                        combinedComparableSplits.add(splits.get(idx));
+                        totalSize += thisLen;
+                        spaceLeft -= thisLen;
+                        if (spaceLeft <= 0)
+                            break;
+                        // find next combinable chunk
+                        startIdx = idx + 1;
+                        if (startIdx >= lenSplits)
+                            break;
+                        dummy.setLength(spaceLeft);
+                        idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit);
+                        idx = -idx-1+startIdx;
+                    }
+                    if (totalSize > maxCombinedSplitSize/2) {
+                        result.add(combinedSplits);
+                        resultLengths.add(totalSize);
+                        removeSplits(combinedComparableSplits);
+                        totalSize = 0;
+                        combinedSplits = new ArrayList<InputSplit>();
+                        combinedComparableSplits.clear();
+                        splits = node.getSplits();
+                    } else {
+                        if (combinedSplits.size() != lenSplits)
+                            throw new AssertionError("Combined split logic error!");
+                        break;
+                    }
+                }
+            }
+            // handle leftovers
+            ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>();
+            HashSet<InputSplit> seen = new HashSet<InputSplit>();
+            for (Node node : nodes) {
+                for (ComparableSplit split : node.getSplits()) {
+                    if (!seen.contains(split.getSplit())) {
+                        // remove duplicates. The set has to be on the raw input split not the 
+                        // comparable input split as the latter overrides the compareTo method
+                        // so its equality semantics is changed and not we want here
+                        seen.add(split.getSplit());
+                        leftoverSplits.add(split);
+                    }
+                }
+            }
+            
+            /* verification code
+            int combinedSplitLen = 0;
+            for (PigSplit split : result)
+              combinedSplitLen += split.getNumPaths();
+            if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt)
+              throw new AssertionError("number of combined splits ["+combinedSplitLen+"+"+leftoverSplits.size()+"] does not match the number of original splits ["+nSplits+"].");
+            */
+            if (!leftoverSplits.isEmpty())
+            {
+                long totalSize = 0;
+                ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+                ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
+                
+                int splitLen = leftoverSplits.size();
+                for (int i = 0; i < splitLen; i++)
+                {
+                    ComparableSplit split = leftoverSplits.get(i);
+                    long thisLen = split.getSplit().getLength();
+                    if (totalSize + thisLen >= maxCombinedSplitSize) {
+                        removeSplits(combinedComparableSplits);
+                        result.add(combinedSplits);
+                        resultLengths.add(totalSize);
+                        combinedSplits = new ArrayList<InputSplit>();
+                        combinedComparableSplits.clear();
+                        totalSize = 0;
+                    }
+                    combinedSplits.add(split.getSplit());
+                    combinedComparableSplits.add(split);
+                    totalSize += split.getSplit().getLength();
+                    if (i == splitLen - 1) {
+                        // last piece: it could be very small, try to see it can be squeezed into any existing splits
+                        for (int j =0; j < result.size(); j++)
+                        {
+                            if (resultLengths.get(j) + totalSize <= maxCombinedSplitSize)
+                            {
+                                List<InputSplit> isList = result.get(j);
+                                for (InputSplit csplit : combinedSplits) {
+                                    isList.add(csplit);
+                                }
+                                removeSplits(combinedComparableSplits);
+                                combinedSplits.clear();
+                                break;
+                            }
+                        }
+                        if (!combinedSplits.isEmpty()) {
+                            // last piece can not be squeezed in, create a new combined split for them.
+                            removeSplits(combinedComparableSplits);
+                            result.add(combinedSplits);
+                        }
+                    }
+                }
+            }
+        }
+        /* verification codes
+        int combinedSplitLen = 0;
+        for (PigSplit split : result)
+          combinedSplitLen += split.getNumPaths();
+        if (combinedSplitLen != nSplits-emptyCnt)
+          throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"].");
+        
+        long totalLen = 0;
+        for (PigSplit split : result)
+          totalLen += split.getLength();
+        
+        long origTotalLen = 0;
+        for (InputSplit split : oneInputSplits)
+          origTotalLen += split.getLength();
+        if (totalLen != origTotalLen)
+          throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]");
+        */ 
+        log.info("Total input paths (combined) to process : " + result.size());
+        return result;
+    }
+    
+    private static void removeSplits(List<ComparableSplit> splits) {
+        for (ComparableSplit split: splits)
+            split.removeFromNodes();
+    }
+    
+    public String inputSplitToString(InputSplit[] splits) throws IOException, InterruptedException {
+        // debugging purpose only
+        StringBuilder st = new StringBuilder();
+        st.append("Number of splits :" + splits.length+"\n");
+        long len = 0;
+        for (InputSplit split: splits)
+            len += split.getLength();
+        st.append("Total Length = "+ len +"\n");
+        for (int i = 0; i < splits.length; i++) {
+            st.append("Input split["+i+"]:\n   Length = "+ splits[i].getLength()+"\n  Locations:\n");
+            for (String location :  splits[i].getLocations())
+                st.append("    "+location+"\n");
+            st.append("\n-----------------------\n"); 
+        }
+        return st.toString();
+    }
+    
+    /* verification code: debug purpose only
+    public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException {
+      StringBuilder st = new StringBuilder();
+      st.append("Number of splits :" + splits.size()+"\n");
+      long len = 0;
+      for (ComparableSplit split: splits)
+        len += split.getSplit().getLength();
+      st.append("Total Length = "+ len +"\n");
+      for (int i = 0; i < splits.size(); i++) {
+        st.append("Input split["+i+"]:\n   Length = "+ splits.get(i).getSplit().getLength()+"\n  Locations:\n");
+        for (String location :  splits.get(i).getSplit().getLocations())
+          st.append("    "+location+"\n");
+        st.append("\n-----------------------\n"); 
+      }
+      return st.toString();
+    }
+    */
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Thu Aug 26 17:54:26 2010
@@ -174,7 +174,7 @@ public class ReadToEndLoader extends Loa
         // create a dummy pigsplit - other than the actual split, the other
         // params are really not needed here where we are just reading the
         // input completely
-        PigSplit pigSplit = new PigSplit(curSplit, -1, 
+        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, 
                 new ArrayList<OperatorKey>(), -1);
         wrappedLoadFunc.prepareToRead(reader, pigSplit);
         return true;

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java?rev=989849&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java Thu Aug 26 17:54:26 2010
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.impl.plan.OperatorKey;
+
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSplitCombine {
+    private Configuration conf;
+    private TestPigInputFormat pigInputFormat;
+    private ArrayList<OperatorKey> ok;
+
+    class TestPigInputFormat extends PigInputFormat {
+        public List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits,
+                        int inputIndex, ArrayList<OperatorKey> targetOps,
+                        Path path, boolean combinable, Configuration conf)
+                        throws IOException, InterruptedException {
+            return super.getPigSplits(oneInputSplits, inputIndex, targetOps,
+                            1000, combinable, conf);
+        }
+    }
+
+    class DummyInputSplit extends InputSplit {
+        private final long length;
+        private final String[] locations;
+
+        public DummyInputSplit(long len, String[] locs) {
+            length = len;
+            locations = locs;
+        }
+
+        @Override
+        public long getLength() {
+            return length;
+        }
+
+        @Override
+        public String[] getLocations() {
+            return locations;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        conf = new Configuration();
+        conf.setLong("pig.maxCombinedSplitSize", 1000);
+        pigInputFormat = new TestPigInputFormat();
+        ok = new ArrayList<OperatorKey>();
+        ok.add(new OperatorKey());
+    }
+
+    @Test
+    public void test1() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(500, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(400, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(400, new String[] {
+                        "l1", "l4", "l5"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(result.size(), 2);
+        int index = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            if (index == 0) {
+                Assert.assertEquals(2, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(500, pigSplit.getLength(0));
+                Assert.assertEquals(400, pigSplit.getLength(1));
+            }
+            else {
+                Assert.assertEquals(1, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l4", "l5"
+                });
+                Assert.assertEquals(400, pigSplit.getLength(0));
+            }
+            index++;
+        }
+    }
+
+    @Test
+    public void test2() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(600, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(700, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(800, new String[] {
+                        "l1", "l4", "l5"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(result.size(), 3);
+        int index = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            if (index == 0) {
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l4", "l5"
+                });
+                Assert.assertEquals(1, len);
+                Assert.assertEquals(800, pigSplit.getLength(0));
+            }
+            else if (index == 1) {
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(1, len);
+                Assert.assertEquals(700, pigSplit.getLength(0));
+            }
+            else {
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(1, len);
+                Assert.assertEquals(600, pigSplit.getLength(0));
+            }
+            index++;
+        }
+    }
+
+    @Test
+    public void test3() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(500, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(200, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l4", "l5"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(1, result.size());
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            Assert.assertEquals(3, len);
+            checkLocations(pigSplit.getLocations(), new String[] {
+                            "l1", "l2", "l3", "l4", "l5"
+            });
+            Assert.assertEquals(500, pigSplit.getLength(0));
+            Assert.assertEquals(200, pigSplit.getLength(1));
+            Assert.assertEquals(100, pigSplit.getLength(2));
+        }
+    }
+
+    @Test
+    public void test4() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(500, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(200, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l4", "l5"
+        }));
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(200, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(500, new String[] {
+                        "l1", "l4", "l5"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(2, result.size());
+        int idx = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            if (idx == 0) {
+                Assert.assertEquals(2, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l4", "l5"
+                });
+                Assert.assertEquals(500, pigSplit.getLength(0));
+                Assert.assertEquals(100, pigSplit.getLength(1));
+            }
+            else {
+                Assert.assertEquals(4, len);
+                Assert.assertEquals(500, pigSplit.getLength(0));
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(200, pigSplit.getLength(1));
+                Assert.assertEquals(200, pigSplit.getLength(2));
+                Assert.assertEquals(100, pigSplit.getLength(3));
+            }
+            idx++;
+        }
+    }
+
+    @Test
+    public void test5() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(500, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(400, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(400, new String[] {
+                        "l1", "l4", "l5"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, false, conf);
+        Assert.assertEquals(3, result.size());
+        int index = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            if (index == 0) {
+                Assert.assertEquals(1, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(500, pigSplit.getLength(0));
+            }
+            else if (index == 1) {
+                Assert.assertEquals(1, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(400, pigSplit.getLength(0));
+            }
+            else {
+                Assert.assertEquals(1, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l4", "l5"
+                });
+                Assert.assertEquals(400, pigSplit.getLength(0));
+            }
+            index++;
+        }
+    }
+
+    @Test
+    public void test6() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(600, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(500, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(400, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(300, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(200, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(3, result.size());
+        int idx = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            if (idx == 0) {
+                Assert.assertEquals(2, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(600, pigSplit.getLength(0));
+                Assert.assertEquals(400, pigSplit.getLength(1));
+            }
+            else if (idx == 1) {
+                Assert.assertEquals(3, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(500, pigSplit.getLength(0));
+                Assert.assertEquals(300, pigSplit.getLength(1));
+                Assert.assertEquals(200, pigSplit.getLength(2));
+            }
+            else {
+                Assert.assertEquals(1, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(100, pigSplit.getLength(0));
+            }
+            idx++;
+        }
+    }
+
+    @Test
+    public void test7() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(200, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(300, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(400, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(500, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(600, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(3, result.size());
+        int idx = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            if (idx == 0) {
+                Assert.assertEquals(2, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(600, pigSplit.getLength(0));
+                Assert.assertEquals(400, pigSplit.getLength(1));
+            }
+            else if (idx == 1) {
+                Assert.assertEquals(3, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(500, pigSplit.getLength(0));
+                Assert.assertEquals(300, pigSplit.getLength(1));
+                Assert.assertEquals(200, pigSplit.getLength(2));
+            }
+            else {
+                Assert.assertEquals(1, len);
+                checkLocations(pigSplit.getLocations(), new String[] {
+                                "l1", "l2", "l3"
+                });
+                Assert.assertEquals(100, pigSplit.getLength(0));
+            }
+            idx++;
+        }
+    }
+
+    @Test
+    public void test8() throws IOException, InterruptedException {
+        ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(100, new String[] {
+                        "l1", "l2", "l3"
+        }));
+        rawSplits.add(new DummyInputSplit(200, new String[] {
+                        "l1", "l4", "l5"
+        }));
+        List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+                        null, true, conf);
+        Assert.assertEquals(result.size(), 1);
+        int index = 0;
+        for (InputSplit split : result) {
+            PigSplit pigSplit = (PigSplit) split;
+            int len = pigSplit.getNumPaths();
+            Assert.assertEquals(3, len);
+            checkLocations(pigSplit.getLocations(), new String[] {
+                            "l1", "l2", "l3", "l4", "l5"
+            });
+            Assert.assertEquals(200, pigSplit.getLength(0));
+            Assert.assertEquals(100, pigSplit.getLength(1));
+            Assert.assertEquals(100, pigSplit.getLength(2));
+            index++;
+        }
+    }
+
+    private void checkLocations(String[] actual, String[] expected) {
+        HashSet<String> expectedSet = new HashSet<String>();
+        for (String str : expected)
+            expectedSet.add(str);
+        int count = 0;
+        for (String str : actual) {
+            if (expectedSet.contains(str)) count++;
+        }
+        Assert.assertEquals(count, expected.length);
+    }
+
+}