You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/07/10 00:20:46 UTC
svn commit: r675362 - in /incubator/pig/branches/types:
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/local/executionengine/ src/org/apache/pig/data/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/logicalLayer/...
Author: gates
Date: Wed Jul 9 15:20:46 2008
New Revision: 675362
URL: http://svn.apache.org/viewvc?rev=675362&view=rev
Log:
PIG-292 Fixes many sort issues. Primary among them is that DataByteArray.compareTo now works properly and keyComparatorClass for the haddop job is
now chosen correctly in order by cases.
Modified:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java
incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Jul 9 15:20:46 2008
@@ -235,6 +235,8 @@
public ExecJob execute(PhysicalPlan plan,
String jobName) throws ExecException {
try {
+ FileSpec spec = checkLeafIsStore(plan);
+ /*
PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
FileSpec spec = null;
if(!(leaf instanceof POStore)){
@@ -251,6 +253,7 @@
else{
spec = ((POStore)leaf).getSFile();
}
+ */
MapReduceLauncher launcher = new MapReduceLauncher();
boolean success = launcher.launchPig(plan, jobName, pigContext);
@@ -277,8 +280,13 @@
try {
PlanPrinter printer = new PlanPrinter(plan);
printer.visit();
- System.out.println();
- } catch (VisitorException ve) {
+ stream.println();
+
+ checkLeafIsStore(plan);
+
+ MapReduceLauncher launcher = new MapReduceLauncher();
+ launcher.explain(plan, pigContext, stream);
+ } catch (Exception ve) {
throw new RuntimeException(ve);
}
}
@@ -459,7 +467,31 @@
InetAddress.getByName(parts[0]);
return parts[0] + ":" + parts[1];
}
-
+
+ private FileSpec checkLeafIsStore(PhysicalPlan plan) throws ExecException {
+ try {
+ PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
+ FileSpec spec = null;
+ if(!(leaf instanceof POStore)){
+ String scope = leaf.getOperatorKey().getScope();
+ POStore str = new POStore(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ str.setPc(pigContext);
+ spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
+ pigContext).toString(),
+ BinStorage.class.getName());
+ str.setSFile(spec);
+ plan.addAsLeaf(str);
+ } else{
+ spec = ((POStore)leaf).getSFile();
+ }
+ return spec;
+ } catch (Exception e) {
+ throw new ExecException(e);
+ }
+ }
+
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Wed Jul 9 15:20:46 2008
@@ -50,6 +50,7 @@
import org.apache.pig.impl.mapReduceLayer.LocalLauncher;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
import org.apache.pig.impl.physicalLayer.relationalOperators.POStore;
import org.apache.pig.impl.plan.VisitorException;
import java.util.Iterator;
@@ -163,7 +164,16 @@
}
public void explain(PhysicalPlan plan, PrintStream stream) {
- // TODO FIX
+ try {
+ PlanPrinter printer = new PlanPrinter(plan);
+ printer.visit();
+ stream.println();
+
+ LocalLauncher launcher = new LocalLauncher();
+ launcher.explain(plan, pigContext, stream);
+ } catch (Exception ve) {
+ throw new RuntimeException(ve);
+ }
}
public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
Modified: incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/DataByteArray.java Wed Jul 9 15:20:46 2008
@@ -110,22 +110,29 @@
return new String(mData);
}
+ /**
+ * Compare two byte arrays. Comparison is done first using byte values
+ * then length. So "g" will be greater than "abcdefg", but "hello worlds"
+ * is greater than "hello world". If the other object is not a
+ * DataByteArray, DataType.compare will be called.
+ * @param other Other object to compare to.
+ * @return -1 if less than, 1 if greater than, 0 if equal.
+ */
public int compareTo(Object other) {
if (other instanceof DataByteArray) {
DataByteArray dba = (DataByteArray)other;
int mySz = mData.length;
int tSz = dba.mData.length;
- if (tSz < mySz) {
- return 1;
- } else if (tSz > mySz) {
- return -1;
- } else {
- for (int i = 0; i < mySz; i++) {
- if (mData[i] < dba.mData[i]) return -1;
- else if (mData[i] > dba.mData[i]) return 1;
- }
- return 0;
+ int i;
+ for (i = 0; i < mySz; i++) {
+ // If the other has run out of characters, we're bigger.
+ if (i >= tSz) return 1;
+ if (mData[i] < dba.mData[i]) return -1;
+ else if (mData[i] > dba.mData[i]) return 1;
}
+ // If the other still has characters left, it's greater
+ if (i < tSz) return -1;
+ return 0;
} else {
return DataType.compare(this, other);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/builtin/FindQuantiles.java Wed Jul 9 15:20:46 2008
@@ -18,6 +18,7 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
+import java.util.Comparator;
import java.util.Iterator;
import org.apache.pig.EvalFunc;
@@ -30,6 +31,14 @@
public class FindQuantiles extends EvalFunc<DataBag>{
BagFactory mBagFactory = BagFactory.getInstance();
+ private class SortComparator implements Comparator<Tuple> {
+ public int compare(Tuple t1, Tuple t2) {
+ return t1.compareTo(t2);
+ }
+ }
+
+ private Comparator<Tuple> mComparator = new SortComparator();
+
/**
* first field in the input tuple is the number of quantiles to generate
* second field is the *sorted* bag of samples
@@ -47,7 +56,8 @@
ioe.initCause(e);
throw ioe;
}
- DataBag output = mBagFactory.newDefaultBag();
+ // TODO If user provided a comparator we should be using that.
+ DataBag output = mBagFactory.newSortedBag(mComparator);
long numSamples = samples.size();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Wed Jul 9 15:20:46 2008
@@ -93,21 +93,21 @@
@Override
public Schema getSchema() throws FrontendException {
- log.info("Entering getSchema");
+ log.trace("Entering getSchema");
if (!mIsSchemaComputed) {
List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>(
mForEachPlans.size());
for (LogicalPlan plan : mForEachPlans) {
- log.info("Number of leaves in " + plan + " = " + plan.getLeaves().size());
+ log.debug("Number of leaves in " + plan + " = " + plan.getLeaves().size());
for(int i = 0; i < plan.getLeaves().size(); ++i) {
- log.info("Leaf" + i + "= " + plan.getLeaves().get(i));
+ log.debug("Leaf" + i + "= " + plan.getLeaves().get(i));
}
//LogicalOperator op = plan.getRoots().get(0);
LogicalOperator op = plan.getLeaves().get(0);
- log.info("op: " + op.getClass().getName() + " " + op);
+ log.debug("op: " + op.getClass().getName() + " " + op);
}
- log.info("Printed the leaves of the generate plans");
+ log.debug("Printed the leaves of the generate plans");
Map<Schema.FieldSchema, String> flattenAlias = new HashMap<Schema.FieldSchema, String>();
Map<String, Boolean> inverseFlattenAlias = new HashMap<String, Boolean>();
@@ -116,19 +116,19 @@
for (int planCtr = 0; planCtr < mForEachPlans.size(); ++planCtr) {
LogicalPlan plan = mForEachPlans.get(planCtr);
LogicalOperator op = plan.getLeaves().get(0);
- log.info("op: " + op.getClass().getName() + " " + op);
- log.info("Flatten: " + mFlatten.get(planCtr));
+ log.debug("op: " + op.getClass().getName() + " " + op);
+ log.debug("Flatten: " + mFlatten.get(planCtr));
Schema.FieldSchema planFs;
try {
planFs = ((ExpressionOperator)op).getFieldSchema();
- log.info("planFs: " + planFs);
+ log.debug("planFs: " + planFs);
if(null != planFs) {
String outerCanonicalAlias = op.getAlias();
if(null == outerCanonicalAlias) {
outerCanonicalAlias = planFs.alias;
}
- log.info("Outer canonical alias: " + outerCanonicalAlias);
+ log.debug("Outer canonical alias: " + outerCanonicalAlias);
if(mFlatten.get(planCtr)) {
//need to extract the children and create the aliases
//assumption here is that flatten is only for one column
@@ -137,8 +137,8 @@
Schema s = planFs.schema;
if(null != s) {
for(Schema.FieldSchema fs: s.getFields()) {
- log.info("fs: " + fs);
- log.info("fs.alias: " + fs.alias);
+ log.debug("fs: " + fs);
+ log.debug("fs.alias: " + fs.alias);
String innerCanonicalAlias = fs.alias;
if((null != outerCanonicalAlias) && (null != innerCanonicalAlias)) {
String disambiguatorAlias = outerCanonicalAlias + "::" + innerCanonicalAlias;
@@ -196,19 +196,19 @@
//check for duplicate column names and throw an error if there are duplicates
//ensure that flatten gets rid of duplicate column names when the checks are
//being done
- log.info(" flattenAlias: " + flattenAlias);
- log.info(" inverseFlattenAlias: " + inverseFlattenAlias);
- log.info(" aliases: " + aliases);
- log.info(" fss.size: " + fss.size());
+ log.debug(" flattenAlias: " + flattenAlias);
+ log.debug(" inverseFlattenAlias: " + inverseFlattenAlias);
+ log.debug(" aliases: " + aliases);
+ log.debug(" fss.size: " + fss.size());
boolean duplicates = false;
Map<String, Integer> duplicateAliases = new HashMap<String, Integer>();
for(String alias: aliases.keySet()) {
Integer count = aliases.get(alias);
if(count > 1) {//not checking for null here as counts are intitalized to 1
Boolean inFlatten = false;
- log.info("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+ log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
inFlatten = inverseFlattenAlias.get(alias);
- log.info("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
+ log.debug("inFlatten: " + inFlatten + " inverseFlattenAlias: " + inverseFlattenAlias);
if((null == inFlatten) || (!inFlatten)) {
duplicates = true;
duplicateAliases.put(alias, count);
@@ -237,14 +237,14 @@
String alias = flattenAlias.get(fs);
Integer count = aliases.get(alias);
if (null == count) count = 1;
- log.info("alias: " + alias);
+ log.debug("alias: " + alias);
if((null != alias) && (count == 1)) {
mSchema.addAlias(alias, fs);
}
}
mIsSchemaComputed = true;
}
- log.info("Exiting getSchema");
+ log.trace("Exiting getSchema");
return mSchema;
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Wed Jul 9 15:20:46 2008
@@ -88,6 +88,10 @@
mSortFunc = func;
}
+ public boolean isStar() {
+ return mIsStar;
+ }
+
public void setStar(boolean b) {
mIsStar = b;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogToPhyTranslationVisitor.java Wed Jul 9 15:20:46 2008
@@ -108,6 +108,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -137,6 +138,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -165,6 +167,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -193,6 +196,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -221,6 +225,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -249,6 +254,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -276,6 +282,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -303,6 +310,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -330,6 +338,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -357,6 +366,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -384,6 +394,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -411,6 +422,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -434,6 +446,7 @@
currentPlan.connect(from, exprOp);
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -457,6 +470,7 @@
currentPlan.connect(from, exprOp);
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -478,6 +492,7 @@
currentPlan.connect(from, exprOp);
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -501,6 +516,7 @@
} catch (PlanException e1) {
log.error("Invalid physical operators in the physical plan"
+ e1.getMessage());
+ throw new VisitorException(e1);
}
int count = 0;
@@ -543,6 +559,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -582,6 +599,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -612,6 +630,7 @@
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
@@ -657,6 +676,7 @@
} catch (PlanException e) {
log.error("Invalid physical operators in the physical plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -703,6 +723,7 @@
currentPlan.connect(from, sort);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
sort.setResultType(s.getType());
@@ -727,6 +748,7 @@
currentPlan.connect(from, physOp);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -744,6 +766,7 @@
currentPlan.connect(from, physOp);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -772,6 +795,7 @@
currentPlan.connect(from, physOp);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -799,6 +823,7 @@
} catch (PlanException e) {
log.error("Invalid physical operator in the plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
LogToPhyMap.put(func, p);
@@ -834,6 +859,7 @@
currentPlan.connect(from, store);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
LogToPhyMap.put(loStore, store);
}
@@ -878,6 +904,7 @@
} catch (PlanException e) {
log.error("Invalid physical operator in the plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -899,6 +926,7 @@
currentPlan.connect(from, physOp);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -918,6 +946,7 @@
currentPlan.connect(from, physOp);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -939,6 +968,7 @@
currentPlan.connect(from, physOp);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -959,6 +989,7 @@
currentPlan.connect(from, physOp);
} catch (PlanException e) {
log.error("Invalid physical operator in the plan" + e.getMessage());
+ throw new VisitorException(e);
}
}
@@ -980,6 +1011,7 @@
} catch (PlanException e) {
log.error("Invalid physical operator in the plan"
+ e.getMessage());
+ throw new VisitorException(e);
}
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Wed Jul 9 15:20:46 2008
@@ -25,13 +25,24 @@
import java.util.List;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.data.DataType;
import org.apache.pig.data.IndexedTuple;
@@ -61,6 +72,8 @@
Configuration conf;
PigContext pigContext;
+ private final Log log = LogFactory.getLog(getClass());
+
/**
* The map between MapReduceOpers and their corresponding Jobs
*/
@@ -257,9 +270,7 @@
jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
Class<? extends WritableComparable> keyClass = DataType.getWritableComparableTypes(pack.getKeyType()).getClass();
jobConf.setOutputKeyClass(keyClass);
- if(keyClass.equals(TupleFactory.getInstance().tupleClass())){
- jobConf.setOutputKeyComparatorClass(PigWritableComparator.class);
- }
+ selectComparator(mro, pack.getKeyType(), jobConf);
jobConf.setOutputValueClass(IndexedTuple.class);
}
@@ -290,12 +301,128 @@
}
public static class PigWritableComparator extends WritableComparator {
- public PigWritableComparator() {
- super(TupleFactory.getInstance().tupleClass());
+ protected PigWritableComparator(Class c) {
+ super(c);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2){
return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
}
}
+
+ public static class PigIntWritableComparator extends PigWritableComparator {
+ public PigIntWritableComparator() {
+ super(IntWritable.class);
+ }
+ }
+
+ public static class PigLongWritableComparator extends PigWritableComparator {
+ public PigLongWritableComparator() {
+ super(LongWritable.class);
+ }
+ }
+
+ public static class PigFloatWritableComparator extends PigWritableComparator {
+ public PigFloatWritableComparator() {
+ super(FloatWritable.class);
+ }
+ }
+
+ /*
+ public static class PigDoubleWritableComparator extends PigWritableComparator {
+ public PigDoubleWritableComparator() {
+ super(Double.class);
+ }
+ }
+ */
+
+ public static class PigCharArrayWritableComparator extends PigWritableComparator {
+ public PigCharArrayWritableComparator() {
+ super(Text.class);
+ }
+ }
+
+ public static class PigDBAWritableComparator extends PigWritableComparator {
+ public PigDBAWritableComparator() {
+ super(BytesWritable.class);
+ }
+ }
+
+ public static class PigTupleWritableComparator extends PigWritableComparator {
+ public PigTupleWritableComparator() {
+ super(TupleFactory.getInstance().tupleClass());
+ }
+ }
+
+ public static class PigBagWritableComparator extends PigWritableComparator {
+ public PigBagWritableComparator() {
+ super(BagFactory.getInstance().newDefaultBag().getClass());
+ }
+ }
+
+ private void selectComparator(
+ MapReduceOper mro,
+ byte keyType,
+ JobConf jobConf) throws JobCreationException {
+ // If this operator is involved in an order by, use the native
+ // comparators. Otherwise use bytewise comparison. Have to
+ // look at the next operator too because if we're the quantile
+ // operation we need to use the native comparators.
+ boolean involved = false;
+ if (mro.isGlobalSort()) {
+ involved = true;
+ } else {
+ List<MapReduceOper> succs = plan.getSuccessors(mro);
+ if (succs != null) {
+ MapReduceOper succ = succs.get(0);
+ if (succ.isGlobalSort()) involved = true;
+ }
+ }
+ if (!involved) {
+ switch (keyType) {
+ case DataType.INTEGER:
+ jobConf.setOutputKeyComparatorClass(PigIntWritableComparator.class);
+ break;
+
+ case DataType.LONG:
+ jobConf.setOutputKeyComparatorClass(PigLongWritableComparator.class);
+ break;
+
+ case DataType.FLOAT:
+ jobConf.setOutputKeyComparatorClass(PigFloatWritableComparator.class);
+ break;
+
+ case DataType.DOUBLE:
+ //jobConf.setOutputKeyComparatorClass(PigDoubleWritableComparator.class);
+ log.error("Waiting for Hadoop to support DoubleWritable");
+ throw new JobCreationException("Waiting for Hadoop to support DoubleWritable");
+
+ case DataType.CHARARRAY:
+ jobConf.setOutputKeyComparatorClass(PigCharArrayWritableComparator.class);
+ break;
+
+ case DataType.BYTEARRAY:
+ jobConf.setOutputKeyComparatorClass(PigDBAWritableComparator.class);
+ break;
+
+ case DataType.MAP:
+ log.error("Using Map as key not supported.");
+ throw new JobCreationException("Using Map as key not supported");
+
+ case DataType.TUPLE:
+ jobConf.setOutputKeyComparatorClass(PigTupleWritableComparator.class);
+ break;
+
+ case DataType.BAG:
+ jobConf.setOutputKeyComparatorClass(PigBagWritableComparator.class);
+ break;
+
+ default:
+ throw new RuntimeException("Forgot case for type " +
+ DataType.findTypeName(keyType));
+ }
+
+ }
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/Launcher.java Wed Jul 9 15:20:46 2008
@@ -1,6 +1,7 @@
package org.apache.pig.impl.mapReduceLayer;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -62,6 +63,22 @@
public abstract boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
throws PlanException, VisitorException, IOException, ExecException,
JobCreationException;
+
+ /**
+ * Explain how a pig job will be executed on the underlying
+ * infrastructure.
+ * @param pp PhysicalPlan to explain
+ * @param pc PigContext to use for configuration
+ * @param ps PrintStream to write output on.
+ * @throws VisitorException
+ * @throws IOException
+ */
+ public abstract void explain(
+ PhysicalPlan pp,
+ PigContext pc,
+ PrintStream ps) throws PlanException,
+ VisitorException,
+ IOException;
protected boolean isComplete(double prog){
return (int)(Math.ceil(prog)) == (int)1;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/LocalLauncher.java Wed Jul 9 15:20:46 2008
@@ -1,6 +1,7 @@
package org.apache.pig.impl.mapReduceLayer;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -13,6 +14,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.mapReduceLayer.plans.MRPrinter;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
@@ -79,7 +81,21 @@
return isComplete(lastProg);
}
-
+
+ @Override
+ public void explain(PhysicalPlan php,
+ PigContext pc,
+ PrintStream ps) throws PlanException,
+ VisitorException,
+ IOException {
+ MRCompiler comp = new MRCompiler(php, pc);
+ comp.compile();
+ MROperPlan mrp = comp.getMRPlan();
+
+ MRPrinter printer = new MRPrinter(ps, mrp);
+ printer.visit();
+ }
+
//A purely testing method. Not to be used elsewhere
public boolean launchPigWithCombinePlan(PhysicalPlan php,
String grpName, PigContext pc, PhysicalPlan combinePlan) throws PlanException,
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MRCompiler.java Wed Jul 9 15:20:46 2008
@@ -179,14 +179,6 @@
*/
public MROperPlan compile() throws IOException, PlanException, VisitorException {
List<PhysicalOperator> leaves = plan.getLeaves();
- /*for (PhysicalOperator operator : leaves) {
- compile(operator);
- if (!curMROp.isMapDone()) {
- curMROp.setMapDone(true);
- } else if (!curMROp.isReduceDone()) {
- curMROp.setReduceDone(true);
- }
- }*/
POStore store = (POStore)leaves.get(0);
compile(store);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Wed Jul 9 15:20:46 2008
@@ -1,6 +1,7 @@
package org.apache.pig.impl.mapReduceLayer;
import java.io.IOException;
+import java.io.PrintStream;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -15,6 +16,7 @@
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.mapReduceLayer.plans.MRPrinter;
import org.apache.pig.impl.physicalLayer.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -83,4 +85,19 @@
return isComplete(lastProg);
}
+
+ @Override
+ public void explain(PhysicalPlan php,
+ PigContext pc,
+ PrintStream ps) throws PlanException,
+ VisitorException,
+ IOException {
+ MRCompiler comp = new MRCompiler(php, pc);
+ comp.compile();
+ MROperPlan mrp = comp.getMRPlan();
+
+ MRPrinter printer = new MRPrinter(ps, mrp);
+ printer.visit();
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java Wed Jul 9 15:20:46 2008
@@ -53,7 +53,7 @@
* key and indexed tuple and collect it into the output
* collector.
*
- * The shuffle and sort phase sorts these key & indexed tuples
+ * The shuffle and sort phase sorts these key & indexed tuples
* and creates key, List<IndexedTuple> and passes the key and
* iterator to the list. The deserialized POPackage operator
* is used to package the key, List<IndexedTuple> into pigKey,
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Wed Jul 9 15:20:46 2008
@@ -29,6 +29,7 @@
import org.apache.hadoop.mapred.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
@@ -63,10 +64,18 @@
t = loader.getNext();
if (t==null)
break;
- quantiles.add(t);
+ // Need to strip the outer tuple and bag.
+ Object o = t.get(0);
+ if (o instanceof DataBag) {
+ for (Tuple it : (DataBag)o) {
+ quantiles.add(it);
+ }
+ } else {
+ quantiles.add(t);
+ }
}
this.quantiles = quantiles.toArray(new Tuple[0]);
- }catch (IOException e){
+ }catch (Exception e){
throw new RuntimeException(e);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MROpPlanVisitor.java Wed Jul 9 15:20:46 2008
@@ -32,13 +32,7 @@
super(plan, walker);
}
- @Override
- public void visit() throws VisitorException {
- // TODO Auto-generated method stub
-
- }
-
- public void visitMROp(MapReduceOper mr) {
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
// do nothing
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/plans/MRPrinter.java Wed Jul 9 15:20:46 2008
@@ -22,6 +22,8 @@
import java.util.List;
import java.io.PrintStream;
+import org.apache.pig.impl.mapReduceLayer.MapReduceOper;
+import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -38,252 +40,39 @@
* @param plan MR plan to print
*/
public MRPrinter(PrintStream ps, MROperPlan plan) {
- super(plan, new DepthFirstWalker(plan));
- }
-
- /* TODO FIX
- public void visit(LOAdd a) throws VisitorException {
- visitBinary(a, "+");
- }
-
- public void visit(LOAnd a) throws VisitorException {
- visitBinary(a, "AND");
- }
-
- public void visit(LOBinCond bc) throws VisitorException {
- print(bc);
- mStream.print(" COND: (");
- bc.getCond().visit(this);
- mStream.print(") TRUE: (");
- bc.getLhsOp().visit(this);
- mStream.print(") FALSE (");
- bc.getRhsOp().visit(this);
- mStream.print(")");
- }
-
- public void visit(LOCogroup g) throws VisitorException {
- print(g);
- mStream.print("GROUP BY PLANS:");
- MultiMap<LogicalOperator, LogicalPlan> plans = g.getGroupByPlans();
- for (LogicalOperator lo : plans.keySet()) {
- // Visit the associated plans
- for (LogicalPlan plan : plans.get(lo)) {
- mIndent++;
- pushWalker(new DepthFirstWalker(plan));
- visit();
- popWalker();
- mIndent--;
- }
- mStream.println();
- }
- // Visit input operators
- for (LogicalOperator lo : plans.keySet()) {
- // Visit the operator
- lo.visit(this);
- }
- }
-
- public void visit(LOConst c) throws VisitorException {
- print(c);
- mStream.print(" VALUE (" + c.getValue() + ")");
- }
-
- public void visit(LOCross c) throws VisitorException {
- print(c);
- mStream.println();
- super.visit(c);
- }
-
- public void visit(LODistinct d) throws VisitorException {
- print(d);
- mStream.println();
- super.visit(d);
- }
-
- public void visit(LODivide d) throws VisitorException {
- visitBinary(d, "/");
- }
-
- public void visit(LOEqual e) throws VisitorException {
- visitBinary(e, "==");
- }
-
- public void visit(LOFilter f) throws VisitorException {
- print(f);
- mStream.print(" COMP: ");
- mIndent++;
- pushWalker(new DepthFirstWalker(f.getComparisonPlan()));
- visit();
- mIndent--;
- mStream.println();
- f.getInput().visit(this);
- }
-
- public void visit(LOForEach f) throws VisitorException {
- print(f);
- mStream.print(" PLAN: ");
- mIndent++;
- pushWalker(new DepthFirstWalker(f.getForEachPlan()));
- visit();
- mIndent--;
- mStream.println();
- // Visit our input
- mPlan.getPredecessors((LogicalOperator)f).get(0).visit(this);
- }
-
- public void visit(LOGreaterThan gt) throws VisitorException {
- visitBinary(gt, ">");
- }
-
- public void visit(LOGreaterThanEqual gte) throws VisitorException {
- visitBinary(gte, ">=");
- }
-
- public void visit(LOLesserThan lt) throws VisitorException {
- visitBinary(lt, "<");
- }
-
- public void visit(LOLesserThanEqual lte) throws VisitorException {
- visitBinary(lte, "<=");
- }
-
- public void visit(LOLoad load) throws VisitorException {
- print(load);
- mStream.print(" FILE: " + load.getInputFile().getFileName());
- mStream.print(" FUNC: " + load.getLoadFunc().getClass().getName());
- mStream.println();
- }
-
- public void visit(LOMapLookup mlu) throws VisitorException {
- print(mlu);
- mStream.print("(");
- mlu.getMap().visit(this);
- mStream.print(")# " + mlu.getKey());
- }
-
- public void visit(LOMod m) throws VisitorException {
- visitBinary(m, "MOD");
- }
-
- public void visit(LOMultiply m) throws VisitorException {
- visitBinary(m, "*");
- }
-
- public void visit(LONegative n) throws VisitorException {
- visitUnary(n, "-");
- }
-
- public void visit(LONot n) throws VisitorException {
- visitUnary(n, "NOT");
- }
-
- public void visit(LONotEqual ne) throws VisitorException {
- visitBinary(ne, "!=");
- }
-
- public void visit(LOOr or) throws VisitorException {
- visitBinary(or, "OR");
- }
-
- public void visit(LOProject p) throws VisitorException {
- print(p);
- if (p.isStar()) {
- mStream.print(" ALL ");
- } else {
- List<Integer> cols = p.getProjection();
- mStream.print(" COL");
- if (cols.size() > 1) mStream.print("S");
- mStream.print(" (");
- for (int i = 0; i < cols.size(); i++) {
- if (i > 0) mStream.print(", ");
- mStream.print(cols.get(i));
- }
- mStream.print(")");
- }
- mStream.print(" FROM ");
- if (p.getSentinel()) {
- // This project is connected to some other relation, don't follow
- // that path or we'll cycle in the graph.
- p.getExpression().name();
- } else {
- mIndent++;
- p.getExpression().visit(this);
- mIndent--;
- }
- }
-
- public void visit(LORegexp r) throws VisitorException {
- print(r);
- mStream.print(" REGEX (" + r.getRegexp() + ") LOOKING IN (");
- r.getOperand().visit(this);
- mStream.print(")");
- }
-
- private void print(LogicalOperator lo, String name) {
- List<EvalSpec> empty = new ArrayList<EvalSpec>();
- print(lo, name, empty);
- }
-
- private void visitBinary(
- BinaryExpressionOperator b,
- String op) throws VisitorException {
- print(b);
- mStream.print(" (");
- b.getLhsOperand().visit(this);
- mStream.print(") " + op + " (");
- b.getRhsOperand().visit(this);
- mStream.print(") ");
- }
-
- private void visitUnary(
- UnaryExpressionOperator e,
- String op) throws VisitorException {
- print(e);
- mStream.print(op + " (");
- e.getOperand().visit(this);
- mStream.print(") ");
- }
-
- private void print(LogicalOperator lo) {
- for (int i = 0; i < mIndent; i++) mStream.print(" ");
-
- printName(lo);
-
- if (!(lo instanceof ExpressionOperator)) {
- mStream.print("Inputs: ");
- for (LogicalOperator predecessor : mPlan.getPredecessors(lo)) {
- printName(predecessor);
- }
- mStream.print("Schema: ");
- try {
- printSchema(lo.getSchema());
- } catch (FrontendException fe) {
- // ignore it, nothing we can do
- mStream.print("()");
- }
- }
- mStream.print(" : ");
- }
-
- private void printName(LogicalOperator lo) {
- mStream.println(lo.name() + " key(" + lo.getOperatorKey().scope +
- ", " + lo.getOperatorKey().id + ") ");
- }
-
- private void printSchema(Schema schema) {
- mStream.print("(");
- for (Schema.FieldSchema fs : schema.getFields()) {
- if (fs.alias != null) mStream.print(fs.alias + ": ");
- mStream.print(DataType.findTypeName(fs.type));
- if (fs.schema != null) {
- if (fs.type == DataType.BAG) mStream.print("{");
- printSchema(fs.schema);
- if (fs.type == DataType.BAG) mStream.print("}");
- }
+ super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+ mStream = ps;
+ mStream.println("--------------------------------------------------");
+ mStream.println("| Map Reduce Plan |");
+ mStream.println("--------------------------------------------------");
+ }
+
+ @Override
+ public void visitMROp(MapReduceOper mr) throws VisitorException {
+ mStream.println("MapReduce node " + mr.getOperatorKey().toString());
+ if (mr.mapPlan != null && mr.mapPlan.size() > 0) {
+ mStream.println("Map Plan");
+ PlanPrinter printer = new PlanPrinter(mr.mapPlan);
+ printer.visit();
+ mStream.println("--------");
+ }
+ if (mr.combinePlan != null && mr.combinePlan.size() > 0) {
+ mStream.println("Combine Plan");
+ PlanPrinter printer = new PlanPrinter(mr.combinePlan);
+ printer.visit();
+ mStream.println("--------");
+ }
+ if (mr.reducePlan != null && mr.reducePlan.size() > 0) {
+ mStream.println("Reduce Plan");
+ PlanPrinter printer = new PlanPrinter(mr.reducePlan);
+ printer.visit();
+ mStream.println("--------");
+ }
+ mStream.println("Global sort: " + mr.isGlobalSort());
+ if (mr.getQuantFile() != null) {
+ mStream.println("Quantile file: " + mr.getQuantFile());
}
- mStream.print(")");
+ mStream.println("----------------");
}
- */
}
-
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Wed Jul 9 15:20:46 2008
@@ -66,6 +66,7 @@
private List<Boolean> mAscCols;
private POUserComparisonFunc mSortFunc;
private final Log log = LogFactory.getLog(getClass());
+ private Comparator<Tuple> mComparator;
private boolean inputsAccumulated = false;
public boolean isUDFComparatorUsed = false;
@@ -80,16 +81,18 @@
this.mAscCols = mAscCols;
this.mSortFunc = mSortFunc;
if (mSortFunc == null) {
- sortedBag = BagFactory.getInstance().newSortedBag(
- new SortComparator());
+ mComparator = new SortComparator();
+ /*sortedBag = BagFactory.getInstance().newSortedBag(
+ new SortComparator());*/
ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
for(PhysicalPlan plan : sortPlans) {
ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
}
} else {
- sortedBag = BagFactory.getInstance().newSortedBag(
- new UDFSortComparator());
+ /*sortedBag = BagFactory.getInstance().newSortedBag(
+ new UDFSortComparator());*/
+ mComparator = new UDFSortComparator();
isUDFComparatorUsed = true;
}
}
@@ -171,6 +174,15 @@
case DataType.LONG:
res = Op.getNext(dummyLong);
break;
+ case DataType.TUPLE:
+ res = Op.getNext(dummyTuple);
+ break;
+
+ default:
+ String msg = new String("Did not expect result of type " +
+ DataType.findTypeName(resultType));
+ log.error(msg);
+ throw new RuntimeException(msg);
}
return res;
}
@@ -220,6 +232,7 @@
Result res = new Result();
if (!inputsAccumulated) {
res = processInput();
+ sortedBag = BagFactory.getInstance().newSortedBag(mComparator);
while (res.returnStatus != POStatus.STATUS_EOP) {
if (res.returnStatus == POStatus.STATUS_ERR) {
log.error("Error in reading from the inputs");
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java?rev=675362&r1=675361&r2=675362&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestDataModel.java Wed Jul 9 15:20:46 2008
@@ -314,6 +314,67 @@
}
@Test
+ public void testMultiFieldTupleCompareTo() throws Exception {
+ TupleFactory tf = TupleFactory.getInstance();
+
+ Tuple t1 = tf.newTuple();
+ Tuple t2 = tf.newTuple();
+
+ t1.append(new DataByteArray("bbb"));
+ t1.append(new DataByteArray("bbb"));
+ t2.append(new DataByteArray("bbb"));
+ t2.append(new DataByteArray("bbb"));
+
+ assertEquals("same data equal", 0, t1.compareTo(t2));
+
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("aaa"));
+ t2.append(new DataByteArray("aaa"));
+ assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("ddd"));
+ t2.append(new DataByteArray("ddd"));
+ assertEquals("less than tuple with greater value", -1, t1.compareTo(t2));
+
+ // First column same, second lesser
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("bbb"));
+ t2.append(new DataByteArray("aaa"));
+ assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+ // First column same, second greater
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("bbb"));
+ t2.append(new DataByteArray("ccc"));
+ assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2));
+
+ // First column less, second same
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("aaa"));
+ t2.append(new DataByteArray("bbb"));
+ assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+ // First column greater, second same
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("ccc"));
+ t2.append(new DataByteArray("bbb"));
+ assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2));
+
+ // First column less, second greater
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("aaa"));
+ t2.append(new DataByteArray("ccc"));
+ assertEquals("greater than tuple with lesser value", 1, t1.compareTo(t2));
+
+ // First column greater, second same
+ t2 = tf.newTuple();
+ t2.append(new DataByteArray("ccc"));
+ t2.append(new DataByteArray("aaa"));
+ assertEquals("greater than tuple with lesser value", -1, t1.compareTo(t2));
+ }
+
+ @Test
public void testByteArrayToString() throws Exception {
DataByteArray ba = new DataByteArray("hello world");
@@ -350,10 +411,23 @@
assertTrue("same data", ba1.compareTo(ba2) == 0);
- assertFalse("lexically lower value less than",
+ assertTrue("different length lexically lower value less than",
ba3.compareTo(ba1) < 0);
- assertFalse("lexically higher value greater than",
+ assertTrue("different length lexically higher value greater than",
ba1.compareTo(ba3) > 0);
+
+ ba2 = new DataByteArray("hello worlc");
+ assertTrue("same length lexically lower value less than",
+ ba2.compareTo(ba1) < 0);
+ assertTrue("same length lexically higher value greater than",
+ ba1.compareTo(ba2) > 0);
+
+ ba2 = new DataByteArray("hello worlds");
+ assertTrue("shorter lexically same value less than",
+ ba1.compareTo(ba2) < 0);
+ assertTrue("longer lexically same value greater than",
+ ba2.compareTo(ba1) > 0);
+
}
private Tuple giveMeOneOfEach() throws Exception {