You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/08/26 19:54:26 UTC
svn commit: r989849 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/util/
src/org/apache/pig/impl/io/ test/org/apache/pig/test/
Author: rding
Date: Thu Aug 26 17:54:26 2010
New Revision: 989849
URL: http://svn.apache.org/viewvc?rev=989849&view=rev
Log:
PIG-1518: multi file input format for loaders
Added:
hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Aug 26 17:54:26 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
IMPROVEMENTS
+PIG-1518: multi file input format for loaders (yanz via rding)
+
PIG-1501: need to investigate the impact of compression on pig performance (yanz via thejas)
PIG-1497: Mandatory rule PartitionFilterOptimizer (xuefuz via daijy)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Aug 26 17:54:26 2010
@@ -636,6 +636,19 @@ public class JobControlCompiler{
conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
+ String tmp;
+ long maxCombinedSplitSize = 0;
+ if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
+ conf.setBoolean("pig.noSplitCombination", true);
+ else if ((tmp = pigContext.getProperties().getProperty("pig.maxCombinedSplitSize", null)) != null) {
+ try {
+ maxCombinedSplitSize = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+ }
+ }
+ if (maxCombinedSplitSize > 0)
+ conf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Aug 26 17:54:26 2010
@@ -1572,6 +1572,9 @@ public class MRCompiler extends PhyPlanV
MRPlan.connect(rightMROpr, curMROp);
}
phyToMROpMap.put(joinOp, curMROp);
+ // no combination of small splits as there is currently no way to guarantee the sortness
+ // of the combined splits.
+ curMROp.noCombineSmallSplits();
}
catch(PlanException e){
int errCode = 2034;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Aug 26 17:54:26 2010
@@ -141,6 +141,11 @@ public class MapReduceOper extends Opera
// Set to true in indexing job generated in map-side cogroup, merge join.
private boolean usingTypedComparator = false;
+ // Flag to indicate if the small input splits need to be combined to form a larger
+ // one in order to reduce the number of mappers. For merge join, both tables
+ // are NOT combinable for correctness.
+ private boolean combineSmallSplits = true;
+
private static enum OPER_FEATURE {
NONE,
// Indicate if this job is a sampling job
@@ -458,4 +463,12 @@ public class MapReduceOper extends Opera
protected void useTypedComparator(boolean useTypedComparator) {
this.usingTypedComparator = useTypedComparator;
}
+
+ protected void noCombineSmallSplits() {
+ combineSmallSplits = false;
+ }
+
+ public boolean combineSmallSplits() {
+ return combineSmallSplits;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Aug 26 17:54:26 2010
@@ -19,7 +19,11 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,8 +40,11 @@ import org.apache.hadoop.mapreduce.Recor
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -107,11 +114,8 @@ public class PigInputFormat extends Inpu
PigInputFormat.sJob = conf;
InputFormat inputFormat = loadFunc.getInputFormat();
- // now invoke the createRecordReader() with this "adjusted" conf
- RecordReader reader = inputFormat.createRecordReader(
- pigSplit.getWrappedSplit(), context);
- return new PigRecordReader(reader, loadFunc, conf);
+ return new PigRecordReader(inputFormat, pigSplit, loadFunc, context);
}
@@ -243,6 +247,12 @@ public class PigInputFormat extends Inpu
FuncSpec loadFuncSpec = inputs.get(i).getFuncSpec();
LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(
loadFuncSpec);
+ boolean combinable = !(loadFunc instanceof MergeJoinIndexer) &&
+ !(IndexableLoadFunc.class.isAssignableFrom(loadFunc.getClass())) &&
+ !(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()) &&
+ OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()));
+ if (combinable)
+ combinable = !conf.getBoolean("pig.noSplitCombination", false);
Configuration confClone = new Configuration(conf);
Job inputSpecificJob = new Job(confClone);
// Pass loader signature to LoadFunc and to InputFormat through
@@ -258,8 +268,8 @@ public class PigInputFormat extends Inpu
List<InputSplit> oneInputSplits = inpFormat.getSplits(
new JobContext(inputSpecificJob.getConfiguration(),
jobcontext.getJobID()));
- List<PigSplit> oneInputPigSplits = getPigSplits(
- oneInputSplits, i, inpTargets.get(i), conf);
+ List<InputSplit> oneInputPigSplits = getPigSplits(
+ oneInputSplits, i, inpTargets.get(i), fs.getDefaultBlockSize(), combinable, confClone);
splits.addAll(oneInputPigSplits);
} catch (ExecException ee) {
throw ee;
@@ -285,19 +295,40 @@ public class PigInputFormat extends Inpu
return splits;
}
- private List<PigSplit> getPigSplits(List<InputSplit> oneInputSplits,
- int inputIndex, ArrayList<OperatorKey> targetOps, Configuration conf) {
- int splitIndex = 0;
- ArrayList<PigSplit> pigSplits = new ArrayList<PigSplit>();
- for (InputSplit inputSplit : oneInputSplits) {
- PigSplit pigSplit = new PigSplit(inputSplit, inputIndex, targetOps,
- splitIndex++);
- pigSplit.setConf(conf);
- pigSplits.add(pigSplit);
+ protected List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits,
+ int inputIndex, ArrayList<OperatorKey> targetOps, long blockSize, boolean combinable, Configuration conf)
+ throws IOException, InterruptedException {
+ ArrayList<InputSplit> pigSplits = new ArrayList<InputSplit>();
+ if (!combinable) {
+ int splitIndex = 0;
+ for (InputSplit inputSplit : oneInputSplits) {
+ PigSplit pigSplit = new PigSplit(new InputSplit[] {inputSplit}, inputIndex, targetOps,
+ splitIndex++);
+ pigSplit.setConf(conf);
+ pigSplits.add(pigSplit);
+ }
+ return pigSplits;
+ } else {
+ long maxCombinedSplitSize = conf.getLong("pig.maxCombinedSplitSize", 0);
+ if (maxCombinedSplitSize== 0)
+ // default is the block size
+ maxCombinedSplitSize = blockSize;
+ List<List<InputSplit>> combinedSplits =
+ MapRedUtil.getCombinePigSplits(oneInputSplits, maxCombinedSplitSize, conf);
+ for (int i = 0; i < combinedSplits.size(); i++)
+ pigSplits.add(createPigSplit(combinedSplits.get(i), inputIndex, targetOps, i, conf));
+ return pigSplits;
}
- return pigSplits;
}
+ private InputSplit createPigSplit(List<InputSplit> combinedSplits,
+ int inputIndex, ArrayList<OperatorKey> targetOps, int splitIndex, Configuration conf)
+ {
+ PigSplit pigSplit = new PigSplit(combinedSplits.toArray(new InputSplit[0]), inputIndex, targetOps, splitIndex);
+ pigSplit.setConf(conf);
+ return pigSplit;
+ }
+
public static PigSplit getActiveSplit() {
return activeSplit;
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Thu Aug 26 17:54:26 2010
@@ -58,9 +58,9 @@ public class PigRecordReader extends Rec
*/
Tuple curValue = null;
- // the underlying RecordReader used by the loader
+ // the current wrapped RecordReader used by the loader
@SuppressWarnings("unchecked")
- private RecordReader wrappedReader;
+ private RecordReader curReader;
// the loader object
private LoadFunc loadfunc;
@@ -71,6 +71,19 @@ public class PigRecordReader extends Rec
// the Hadoop counter name
transient private String counterName = null;
+ // the wrapped inputformat
+ private InputFormat inputformat;
+
+ // the wrapped splits
+ private PigSplit pigSplit;
+
+ // the wrapped split index in use
+ private int idx;
+
+ private long progress;
+
+ private TaskAttemptContext context;
+
/**
* the Configuration object with data specific to the input the underlying
* RecordReader will process (this is obtained after a
@@ -80,19 +93,28 @@ public class PigRecordReader extends Rec
*/
private Configuration inputSpecificConf;
/**
- * @param conf
+ * @param context
*
*/
- public PigRecordReader(RecordReader wrappedReader,
- LoadFunc loadFunc, Configuration conf) {
- this.wrappedReader = wrappedReader;
+ public PigRecordReader(InputFormat inputformat, PigSplit pigSplit,
+ LoadFunc loadFunc, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.inputformat = inputformat;
+ this.pigSplit = pigSplit;
this.loadfunc = loadFunc;
- this.inputSpecificConf = conf;
+ this.context = context;
+ this.inputSpecificConf = context.getConfiguration();
+ curReader = null;
+ progress = 0;
+ idx = 0;
+ initNextRecordReader();
}
@Override
public void close() throws IOException {
- wrappedReader.close();
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ }
}
@Override
@@ -125,7 +147,12 @@ public class PigRecordReader extends Rec
@Override
public float getProgress() throws IOException, InterruptedException {
- return wrappedReader.getProgress();
+ long subprogress = 0; // bytes processed in current split
+ if (null != curReader) {
+ // idx is always one past the current subsplit's true index.
+ subprogress = (long)(curReader.getProgress() * pigSplit.getLength(idx - 1));
+ }
+ return Math.min(1.0f, (progress + subprogress)/(float)(pigSplit.getLength()));
}
@Override
@@ -135,7 +162,8 @@ public class PigRecordReader extends Rec
// object - this is achieved by merging the Context corresponding to
// the input split this Reader is supposed to process with the context
// passed in.
- PigSplit pigSplit = (PigSplit)split;
+ this.pigSplit = (PigSplit)split;
+ this.context = context;
ConfigurationUtil.mergeConf(context.getConfiguration(),
inputSpecificConf);
// Pass loader signature to LoadFunc and to InputFormat through
@@ -144,8 +172,10 @@ public class PigRecordReader extends Rec
context.getConfiguration());
// now invoke initialize() on underlying RecordReader with
// the "adjusted" conf
- wrappedReader.initialize(pigSplit.getWrappedSplit(), context);
- loadfunc.prepareToRead(wrappedReader, pigSplit);
+ if (null != curReader) {
+ curReader.initialize(pigSplit.getWrappedSplit(), context);
+ loadfunc.prepareToRead(curReader, pigSplit);
+ }
if (pigSplit.isMultiInputs()) {
counterName = getMultiInputsCounerName(pigSplit, inputSpecificConf);
@@ -154,8 +184,12 @@ public class PigRecordReader extends Rec
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
- curValue = loadfunc.getNext();
- return curValue != null;
+ while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
+ if (!initNextRecordReader()) {
+ return false;
+ }
+ }
+ return true;
}
@SuppressWarnings("unchecked")
@@ -167,4 +201,41 @@ public class PigRecordReader extends Rec
String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
return PigStatsUtil.getMultiInputsCounterName(fname);
}
+
+ /**
+ * Get the record reader for the next chunk in this CombineFileSplit.
+ */
+ protected boolean initNextRecordReader() throws IOException, InterruptedException {
+
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ if (idx > 0) {
+ progress += pigSplit.getLength(idx-1); // done processing so far
+ }
+ }
+
+ // if all chunks have been processed, nothing more to do.
+ if (idx == pigSplit.getNumPaths()) {
+ return false;
+ }
+
+ // get a record reader for the idx-th chunk
+ try {
+
+
+ curReader = inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
+
+ if (idx > 0) {
+ // initialize() for the first RecordReader will be called by MapTask;
+ // we're responsible for initializing subsequent RecordReaders.
+ curReader.initialize(pigSplit.getWrappedSplit(idx), context);
+ loadfunc.prepareToRead(curReader, pigSplit);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException (e);
+ }
+ idx++;
+ return true;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Thu Aug 26 17:54:26 2010
@@ -29,6 +29,8 @@ import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.HashSet;
+import java.lang.StringBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -65,7 +67,7 @@ public class PigSplit extends InputSplit
private int inputIndex;
// The real InputSplit this split is wrapping
- private InputSplit wrappedSplit;
+ private InputSplit[] wrappedSplits;
// index of the wrappedSplit in the list of splits returned by
// InputFormat.getSplits()
@@ -87,20 +89,29 @@ public class PigSplit extends InputSplit
* total number of splits - required by skew join
*/
private int totalSplits;
+
+ /**
+ * total length
+ */
+ private long length = -1;
+
+ /**
+ * overall locations
+ */
+ String[] locations = null;
// this seems necessary for Hadoop to instatiate this split on the
// backend
public PigSplit() {}
- public PigSplit(InputSplit wrappedSplit, int inputIndex,
+ public PigSplit(InputSplit[] wrappedSplits, int inputIndex,
List<OperatorKey> targetOps, int splitIndex) {
- this.wrappedSplit = wrappedSplit;
+ this.wrappedSplits = wrappedSplits;
this.inputIndex = inputIndex;
this.targetOps = new ArrayList<OperatorKey>(targetOps);
this.splitIndex = splitIndex;
}
-
-
+
public List<OperatorKey> getTargetOps() {
return new ArrayList<OperatorKey>(targetOps);
}
@@ -112,17 +123,53 @@ public class PigSplit extends InputSplit
* @return the wrappedSplit
*/
public InputSplit getWrappedSplit() {
- return wrappedSplit;
+ return wrappedSplits[0];
+ }
+
+ /**
+ *
+ * @param idx the index into the wrapped splits
+ * @return the specified wrapped split
+ */
+ public InputSplit getWrappedSplit(int idx) {
+ return wrappedSplits[idx];
}
@Override
public String[] getLocations() throws IOException, InterruptedException {
- return wrappedSplit.getLocations();
+ if (locations == null) {
+ HashSet<String> locSet = new HashSet<String>();
+ for (int i = 0; i < wrappedSplits.length; i++)
+ {
+ String[] locs = wrappedSplits[i].getLocations();
+ for (int j = 0; j < locs.length; j++)
+ locSet.add(locs[j]);
+ }
+ locations = new String[locSet.size()];
+ int i = 0;
+ for (String loc : locSet)
+ locations[i++] = loc;
+ }
+ return locations;
}
@Override
public long getLength() throws IOException, InterruptedException {
- return wrappedSplit.getLength();
+ if (length == -1) {
+ length = 0;
+ for (int i = 0; i < wrappedSplits.length; i++)
+ length += wrappedSplits[i].getLength();
+ }
+ return length;
+ }
+
+ /**
+ * Return the length of a wrapped split
+ * @param idx the index into the wrapped splits
+ * @return number of wrapped splits
+ */
+ public long getLength(int idx) throws IOException, InterruptedException {
+ return wrappedSplits[idx].getLength();
}
@SuppressWarnings("unchecked")
@@ -132,15 +179,20 @@ public class PigSplit extends InputSplit
splitIndex = is.readInt();
inputIndex = is.readInt();
targetOps = (ArrayList<OperatorKey>) readObject(is);
+ int splitLen = is.readInt();
String splitClassName = is.readUTF();
try {
Class splitClass = conf.getClassByName(splitClassName);
- wrappedSplit = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
SerializationFactory sf = new SerializationFactory(conf);
// The correct call sequence for Deserializer is, we shall open, then deserialize, but we shall not close
Deserializer d = sf.getDeserializer(splitClass);
d.open((InputStream) is);
- d.deserialize(wrappedSplit);
+ wrappedSplits = new InputSplit[splitLen];
+ for (int i = 0; i < splitLen; i++)
+ {
+ wrappedSplits[i] = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
+ d.deserialize(wrappedSplits[i]);
+ }
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
@@ -154,13 +206,17 @@ public class PigSplit extends InputSplit
os.writeInt(splitIndex);
os.writeInt(inputIndex);
writeObject(targetOps, os);
- os.writeUTF(wrappedSplit.getClass().getName());
+ os.writeInt(wrappedSplits.length);
+ os.writeUTF(wrappedSplits[0].getClass().getName());
SerializationFactory sf = new SerializationFactory(conf);
Serializer s =
- sf.getSerializer(wrappedSplit.getClass());
- // The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close
+ sf.getSerializer(wrappedSplits[0].getClass());
s.open((OutputStream) os);
- s.serialize(wrappedSplit);
+ for (int i = 0; i < wrappedSplits.length; i++)
+ {
+ // The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close
+ s.serialize(wrappedSplits[i]);
+ }
}
@@ -242,6 +298,14 @@ public class PigSplit extends InputSplit
int getInputIndex() {
return inputIndex;
}
+
+ /**
+ *
+ * @return the number of wrapped splits
+ */
+ public int getNumPaths() {
+ return wrappedSplits.length;
+ }
/**
* @return the totalSplits
@@ -263,4 +327,23 @@ public class PigSplit extends InputSplit
this.totalSplits = totalSplits;
}
+ @Override
+ public String toString() {
+ StringBuilder st = new StringBuilder();
+ st.append("Number of splits :" + wrappedSplits.length+"\n");
+ try {
+ st.append("Total Length = "+ getLength()+"\n");
+ for (int i = 0; i < wrappedSplits.length; i++) {
+ st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n Locations:\n");
+ for (String location : wrappedSplits[i].getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
+ }
+ } catch (IOException e) {
+ return null;
+ } catch (InterruptedException e) {
+ return null;
+ }
+ return st.toString();
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Thu Aug 26 17:54:26 2010
@@ -21,9 +21,12 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Comparator;
+import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
@@ -39,6 +43,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -222,4 +227,397 @@ public class MapRedUtil {
return !name.startsWith("_") && !name.startsWith(".");
}
};
+
+ /* The following codes are for split combination: see PIG-1518
+ *
+ */
+ private static Comparator<Node> nodeComparator = new Comparator<Node>() {
+ @Override
+ public int compare(Node o1, Node o2) {
+ long cmp = o1.length - o2.length;
+ return cmp == 0 ? 0 : cmp < 0 ? -1 : 1;
+ }
+ };
+
+ private static final class ComparableSplit implements Comparable<ComparableSplit> {
+ private InputSplit rawInputSplit;
+ private HashSet<Node> nodes;
+ // id used as a tie-breaker when two splits are of equal size.
+ private long id;
+ ComparableSplit(InputSplit split, long id) {
+ rawInputSplit = split;
+ nodes = new HashSet<Node>();
+ this.id = id;
+ }
+
+ void add(Node node) {
+ nodes.add(node);
+ }
+
+ void removeFromNodes() {
+ for (Node node : nodes)
+ node.remove(this);
+ }
+
+ public InputSplit getSplit() {
+ return rawInputSplit;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof ComparableSplit))
+ return false;
+ return (compareTo((ComparableSplit) other) == 0);
+ }
+
+ @Override
+ public int hashCode() {
+ return 41;
+ }
+
+ @Override
+ public int compareTo(ComparableSplit other) {
+ try {
+ long cmp = rawInputSplit.getLength() - other.rawInputSplit.getLength();
+ // in descending order
+ return cmp == 0 ? (id == other.id ? 0 : id < other.id ? -1 : 1) : cmp < 0 ? 1 : -1;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static class DummySplit extends InputSplit {
+ private long length;
+
+ @Override
+ public String[] getLocations() {
+ return null;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+ }
+
+ private static class Node {
+ private long length = 0;
+ private ArrayList<ComparableSplit> splits;
+ private boolean sorted;
+
+ Node() throws IOException, InterruptedException {
+ length = 0;
+ splits = new ArrayList<ComparableSplit>();
+ sorted = false;
+ }
+
+ void add(ComparableSplit split) throws IOException, InterruptedException {
+ splits.add(split);
+ length++;
+ }
+
+ void remove(ComparableSplit split) {
+ if (!sorted)
+ sort();
+ int index = Collections.binarySearch(splits, split);
+ if (index >= 0) {
+ splits.remove(index);
+ length--;
+ }
+ }
+
+ void sort() {
+ if (!sorted) {
+ Collections.sort(splits);
+ sorted = true;
+ }
+ }
+
+ ArrayList<ComparableSplit> getSplits() {
+ return splits;
+ }
+
+ public long getLength() {
+ return length;
+ }
+ }
+
+ public static List<List<InputSplit>> getCombinePigSplits(List<InputSplit>
+ oneInputSplits, long maxCombinedSplitSize, Configuration conf)
+ throws IOException, InterruptedException {
+ ArrayList<Node> nodes = new ArrayList<Node>();
+ HashMap<String, Node> nodeMap = new HashMap<String, Node>();
+ List<List<InputSplit>> result = new ArrayList<List<InputSplit>>();
+ List<Long> resultLengths = new ArrayList<Long>();
+ long comparableSplitId = 0;
+
+ int size = 0, nSplits = oneInputSplits.size();
+ InputSplit lastSplit = null;
+ int emptyCnt = 0;
+ for (InputSplit split : oneInputSplits) {
+ if (split.getLength() == 0) {
+ emptyCnt++;
+ continue;
+ }
+ if (split.getLength() >= maxCombinedSplitSize) {
+ comparableSplitId++;
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ combinedSplits.add(split);
+ result.add(combinedSplits);
+ resultLengths.add(split.getLength());
+ } else {
+ ComparableSplit csplit = new ComparableSplit(split, comparableSplitId++);
+ String[] locations = split.getLocations();
+ HashSet<String> locationSeen = new HashSet<String>();
+ for (String location : locations)
+ {
+ if (!locationSeen.contains(location))
+ {
+ Node node = nodeMap.get(location);
+ if (node == null) {
+ node = new Node();
+ nodes.add(node);
+ nodeMap.put(location, node);
+ }
+ node.add(csplit);
+ csplit.add(node);
+ locationSeen.add(location);
+ }
+ }
+ lastSplit = split;
+ size++;
+ }
+ }
+ /* verification code: debug purpose
+ {
+ ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>();
+ HashSet<InputSplit> seen = new HashSet<InputSplit>();
+ for (Node node : nodes) {
+ if (node.getLength() > 0)
+ {
+ ArrayList<ComparableSplit> splits = node.getSplits();
+ for (ComparableSplit split : splits) {
+ if (!seen.contains(split.getSplit())) {
+ // remove duplicates. The set has to be on the raw input split not the
+ // comparable input split as the latter overrides the compareTo method
+ // so its equality semantics is changed and not we want here
+ seen.add(split.getSplit());
+ leftoverSplits.add(split);
+ }
+ }
+ }
+ }
+
+ int combinedSplitLen = 0;
+ for (PigSplit split : result)
+ combinedSplitLen += split.getNumPaths();
+ if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt) {
+ throw new AssertionError("number of combined splits {"+combinedSplitLen+"+"+leftoverSplits.size()+"-"+size+"} does not match the number of original splits ["+nSplits+"].");
+ }
+ }
+ */
+ if (nSplits > 0 && emptyCnt == nSplits)
+ {
+ // if all splits are empty, add a single empty split as currently an empty directory is
+ // not properly handled somewhere
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ combinedSplits.add(oneInputSplits.get(0));
+ result.add(combinedSplits);
+ }
+ else if (size == 1) {
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ combinedSplits.add(lastSplit);
+ result.add(combinedSplits);
+ } else if (size > 1) {
+ // combine small splits
+ Collections.sort(nodes, nodeComparator);
+ DummySplit dummy = new DummySplit();
+ // dummy is used to search for next split of suitable size to be combined
+ ComparableSplit dummyComparableSplit = new ComparableSplit(dummy, -1);
+ for (Node node : nodes) {
+ // sort the splits on this node in descending order
+ node.sort();
+ long totalSize = 0;
+ ArrayList<ComparableSplit> splits = node.getSplits();
+ int idx;
+ int lenSplits;
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
+ while (!splits.isEmpty()) {
+ combinedSplits.add(splits.get(0).getSplit());
+ combinedComparableSplits.add(splits.get(0));
+ int startIdx = 1;
+ lenSplits = splits.size();
+ totalSize += splits.get(0).getSplit().getLength();
+ long spaceLeft = maxCombinedSplitSize - totalSize;
+ dummy.setLength(spaceLeft);
+ idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit);
+ idx = -idx-1+startIdx;
+ while (idx < lenSplits)
+ {
+ long thisLen = splits.get(idx).getSplit().getLength();
+ combinedSplits.add(splits.get(idx).getSplit());
+ combinedComparableSplits.add(splits.get(idx));
+ totalSize += thisLen;
+ spaceLeft -= thisLen;
+ if (spaceLeft <= 0)
+ break;
+ // find next combinable chunk
+ startIdx = idx + 1;
+ if (startIdx >= lenSplits)
+ break;
+ dummy.setLength(spaceLeft);
+ idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit);
+ idx = -idx-1+startIdx;
+ }
+ if (totalSize > maxCombinedSplitSize/2) {
+ result.add(combinedSplits);
+ resultLengths.add(totalSize);
+ removeSplits(combinedComparableSplits);
+ totalSize = 0;
+ combinedSplits = new ArrayList<InputSplit>();
+ combinedComparableSplits.clear();
+ splits = node.getSplits();
+ } else {
+ if (combinedSplits.size() != lenSplits)
+ throw new AssertionError("Combined split logic error!");
+ break;
+ }
+ }
+ }
+ // handle leftovers
+ ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>();
+ HashSet<InputSplit> seen = new HashSet<InputSplit>();
+ for (Node node : nodes) {
+ for (ComparableSplit split : node.getSplits()) {
+ if (!seen.contains(split.getSplit())) {
+ // remove duplicates. The set has to be on the raw input split not the
+ // comparable input split as the latter overrides the compareTo method
+ // so its equality semantics is changed and not we want here
+ seen.add(split.getSplit());
+ leftoverSplits.add(split);
+ }
+ }
+ }
+
+ /* verification code
+ int combinedSplitLen = 0;
+ for (PigSplit split : result)
+ combinedSplitLen += split.getNumPaths();
+ if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt)
+ throw new AssertionError("number of combined splits ["+combinedSplitLen+"+"+leftoverSplits.size()+"] does not match the number of original splits ["+nSplits+"].");
+ */
+ if (!leftoverSplits.isEmpty())
+ {
+ long totalSize = 0;
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
+
+ int splitLen = leftoverSplits.size();
+ for (int i = 0; i < splitLen; i++)
+ {
+ ComparableSplit split = leftoverSplits.get(i);
+ long thisLen = split.getSplit().getLength();
+ if (totalSize + thisLen >= maxCombinedSplitSize) {
+ removeSplits(combinedComparableSplits);
+ result.add(combinedSplits);
+ resultLengths.add(totalSize);
+ combinedSplits = new ArrayList<InputSplit>();
+ combinedComparableSplits.clear();
+ totalSize = 0;
+ }
+ combinedSplits.add(split.getSplit());
+ combinedComparableSplits.add(split);
+ totalSize += split.getSplit().getLength();
+ if (i == splitLen - 1) {
+ // last piece: it could be very small, try to see it can be squeezed into any existing splits
+ for (int j =0; j < result.size(); j++)
+ {
+ if (resultLengths.get(j) + totalSize <= maxCombinedSplitSize)
+ {
+ List<InputSplit> isList = result.get(j);
+ for (InputSplit csplit : combinedSplits) {
+ isList.add(csplit);
+ }
+ removeSplits(combinedComparableSplits);
+ combinedSplits.clear();
+ break;
+ }
+ }
+ if (!combinedSplits.isEmpty()) {
+ // last piece can not be squeezed in, create a new combined split for them.
+ removeSplits(combinedComparableSplits);
+ result.add(combinedSplits);
+ }
+ }
+ }
+ }
+ }
+ /* verification codes
+ int combinedSplitLen = 0;
+ for (PigSplit split : result)
+ combinedSplitLen += split.getNumPaths();
+ if (combinedSplitLen != nSplits-emptyCnt)
+ throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"].");
+
+ long totalLen = 0;
+ for (PigSplit split : result)
+ totalLen += split.getLength();
+
+ long origTotalLen = 0;
+ for (InputSplit split : oneInputSplits)
+ origTotalLen += split.getLength();
+ if (totalLen != origTotalLen)
+ throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]");
+ */
+ log.info("Total input paths (combined) to process : " + result.size());
+ return result;
+ }
+
+ private static void removeSplits(List<ComparableSplit> splits) {
+ for (ComparableSplit split: splits)
+ split.removeFromNodes();
+ }
+
+ public String inputSplitToString(InputSplit[] splits) throws IOException, InterruptedException {
+ // debugging purpose only
+ StringBuilder st = new StringBuilder();
+ st.append("Number of splits :" + splits.length+"\n");
+ long len = 0;
+ for (InputSplit split: splits)
+ len += split.getLength();
+ st.append("Total Length = "+ len +"\n");
+ for (int i = 0; i < splits.length; i++) {
+ st.append("Input split["+i+"]:\n Length = "+ splits[i].getLength()+"\n Locations:\n");
+ for (String location : splits[i].getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
+ }
+ return st.toString();
+ }
+
+ /* verification code: debug purpose only
+ public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException {
+ StringBuilder st = new StringBuilder();
+ st.append("Number of splits :" + splits.size()+"\n");
+ long len = 0;
+ for (ComparableSplit split: splits)
+ len += split.getSplit().getLength();
+ st.append("Total Length = "+ len +"\n");
+ for (int i = 0; i < splits.size(); i++) {
+ st.append("Input split["+i+"]:\n Length = "+ splits.get(i).getSplit().getLength()+"\n Locations:\n");
+ for (String location : splits.get(i).getSplit().getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
+ }
+ return st.toString();
+ }
+ */
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=989849&r1=989848&r2=989849&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Thu Aug 26 17:54:26 2010
@@ -174,7 +174,7 @@ public class ReadToEndLoader extends Loa
// create a dummy pigsplit - other than the actual split, the other
// params are really not needed here where we are just reading the
// input completely
- PigSplit pigSplit = new PigSplit(curSplit, -1,
+ PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
new ArrayList<OperatorKey>(), -1);
wrappedLoadFunc.prepareToRead(reader, pigSplit);
return true;
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java?rev=989849&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSplitCombine.java Thu Aug 26 17:54:26 2010
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.impl.plan.OperatorKey;
+
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSplitCombine {
+ private Configuration conf;
+ private TestPigInputFormat pigInputFormat;
+ private ArrayList<OperatorKey> ok;
+
+ class TestPigInputFormat extends PigInputFormat {
+ public List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits,
+ int inputIndex, ArrayList<OperatorKey> targetOps,
+ Path path, boolean combinable, Configuration conf)
+ throws IOException, InterruptedException {
+ return super.getPigSplits(oneInputSplits, inputIndex, targetOps,
+ 1000, combinable, conf);
+ }
+ }
+
+ class DummyInputSplit extends InputSplit {
+ private final long length;
+ private final String[] locations;
+
+ public DummyInputSplit(long len, String[] locs) {
+ length = len;
+ locations = locs;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return locations;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ conf.setLong("pig.maxCombinedSplitSize", 1000);
+ pigInputFormat = new TestPigInputFormat();
+ ok = new ArrayList<OperatorKey>();
+ ok.add(new OperatorKey());
+ }
+
+ @Test
+ public void test1() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(result.size(), 2);
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (index == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(400, pigSplit.getLength(1));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(400, pigSplit.getLength(0));
+ }
+ index++;
+ }
+ }
+
+ @Test
+ public void test2() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(600, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(700, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(800, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(result.size(), 3);
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (index == 0) {
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(800, pigSplit.getLength(0));
+ }
+ else if (index == 1) {
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(700, pigSplit.getLength(0));
+ }
+ else {
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(600, pigSplit.getLength(0));
+ }
+ index++;
+ }
+ }
+
+ @Test
+ public void test3() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(1, result.size());
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3", "l4", "l5"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(200, pigSplit.getLength(1));
+ Assert.assertEquals(100, pigSplit.getLength(2));
+ }
+ }
+
+ @Test
+ public void test4() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l4", "l5"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(2, result.size());
+ int idx = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (idx == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(100, pigSplit.getLength(1));
+ }
+ else {
+ Assert.assertEquals(4, len);
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(200, pigSplit.getLength(1));
+ Assert.assertEquals(200, pigSplit.getLength(2));
+ Assert.assertEquals(100, pigSplit.getLength(3));
+ }
+ idx++;
+ }
+ }
+
+ @Test
+ public void test5() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, false, conf);
+ Assert.assertEquals(3, result.size());
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (index == 0) {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ }
+ else if (index == 1) {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(400, pigSplit.getLength(0));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(400, pigSplit.getLength(0));
+ }
+ index++;
+ }
+ }
+
+ @Test
+ public void test6() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(600, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(300, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(3, result.size());
+ int idx = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (idx == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(600, pigSplit.getLength(0));
+ Assert.assertEquals(400, pigSplit.getLength(1));
+ }
+ else if (idx == 1) {
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(300, pigSplit.getLength(1));
+ Assert.assertEquals(200, pigSplit.getLength(2));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(100, pigSplit.getLength(0));
+ }
+ idx++;
+ }
+ }
+
+ @Test
+ public void test7() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(300, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(600, new String[] {
+ "l1", "l2", "l3"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(3, result.size());
+ int idx = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (idx == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(600, pigSplit.getLength(0));
+ Assert.assertEquals(400, pigSplit.getLength(1));
+ }
+ else if (idx == 1) {
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(300, pigSplit.getLength(1));
+ Assert.assertEquals(200, pigSplit.getLength(2));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(100, pigSplit.getLength(0));
+ }
+ idx++;
+ }
+ }
+
+ @Test
+ public void test8() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(result.size(), 1);
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3", "l4", "l5"
+ });
+ Assert.assertEquals(200, pigSplit.getLength(0));
+ Assert.assertEquals(100, pigSplit.getLength(1));
+ Assert.assertEquals(100, pigSplit.getLength(2));
+ index++;
+ }
+ }
+
+ private void checkLocations(String[] actual, String[] expected) {
+ HashSet<String> expectedSet = new HashSet<String>();
+ for (String str : expected)
+ expectedSet.add(str);
+ int count = 0;
+ for (String str : actual) {
+ if (expectedSet.contains(str)) count++;
+ }
+ Assert.assertEquals(count, expected.length);
+ }
+
+}