You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/02/24 19:13:06 UTC
svn commit: r915907 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/org/apache/pig/impl/builtin/
Author: rding
Date: Wed Feb 24 18:13:05 2010
New Revision: 915907
URL: http://svn.apache.org/viewvc?rev=915907&view=rev
Log:
PIG-1079: Modify merge join to use distributed cache to maintain the index
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Wed Feb 24 18:13:05 2010
@@ -133,6 +133,9 @@
BUG FIXES
+PIG-1079: Modify merge join to use distributed cache to maintain the index
+(rding)
+
PIG-1241: Accumulator is turned on when a map is used with a non-accumulative
UDF (yinghe vi olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Feb 24 18:13:05 2010
@@ -59,6 +59,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -221,26 +222,6 @@
return new Path(pathStr);
}
- private Path makeTmpPath() throws IOException {
- Path tmpPath = null;
- for (int tries = 0;;) {
- try {
- tmpPath =
- new Path(FileLocalizer
- .getTemporaryPath(null, pigContext).toString());
- FileSystem fs = tmpPath.getFileSystem(conf);
- tmpPath = tmpPath.makeQualified(fs);
- fs.mkdirs(tmpPath);
- break;
- } catch (IOException ioe) {
- if (++tries==100) {
- throw ioe;
- }
- }
- }
- return tmpPath;
- }
-
/**
* Compiles all jobs that have no dependencies removes them from
* the plan and returns. Should be called with the same plan until
@@ -520,7 +501,7 @@
// this call modifies the ReplFiles names of POFRJoin operators
// within the MR plans, must be called before the plans are
// serialized
- setupDistributedCacheForFRJoin(mro, pigContext, conf);
+ setupDistributedCacheForJoin(mro, pigContext, conf);
POPackage pack = null;
if(mro.reducePlan.isEmpty()){
@@ -653,13 +634,13 @@
}
public static class PigSecondaryKeyGroupComparator extends WritableComparator {
- @SuppressWarnings("unchecked")
public PigSecondaryKeyGroupComparator() {
// super(TupleFactory.getInstance().tupleClass(), true);
super(NullableTuple.class, true);
}
- @Override
+ @SuppressWarnings("unchecked")
+ @Override
public int compare(WritableComparable a, WritableComparable b)
{
PigNullableWritable wa = (PigNullableWritable)a;
@@ -947,13 +928,13 @@
}
}
- private void setupDistributedCacheForFRJoin(MapReduceOper mro,
+ private void setupDistributedCacheForJoin(MapReduceOper mro,
PigContext pigContext, Configuration conf) throws IOException {
- new FRJoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
+ new JoinDistributedCacheVisitor(mro.mapPlan, pigContext, conf)
.visit();
- new FRJoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
+ new JoinDistributedCacheVisitor(mro.reducePlan, pigContext, conf)
.visit();
}
@@ -1056,13 +1037,13 @@
return symlink;
}
- private static class FRJoinDistributedCacheVisitor extends PhyPlanVisitor {
+ private static class JoinDistributedCacheVisitor extends PhyPlanVisitor {
private PigContext pigContext = null;
private Configuration conf = null;
- public FRJoinDistributedCacheVisitor(PhysicalPlan plan,
+ public JoinDistributedCacheVisitor(PhysicalPlan plan,
PigContext pigContext, Configuration conf) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
plan));
@@ -1110,6 +1091,29 @@
throw new VisitorException(msg, e);
}
}
+
+ @Override
+ public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+
+ // XXX Hadoop currently doesn't support distributed cache in local mode.
+ // This line will be removed after the support is added
+ if (pigContext.getExecType() == ExecType.LOCAL) return;
+
+ String indexFile = join.getIndexFile();
+
+ // merge join may not use an index file
+ if (indexFile == null) return;
+
+ try {
+ String symlink = addSingleFileToDistributedCache(pigContext,
+ conf, indexFile, "indexfile_");
+ join.setIndexFile(symlink);
+ } catch (IOException e) {
+ String msg = "Internal error. Distributed cache could not " +
+ "be set up for merge join index file";
+ throw new VisitorException(msg, e);
+ }
+ }
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Feb 24 18:13:05 2010
@@ -1286,7 +1286,9 @@
defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
- joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+ joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
+ joinOp.setIndexFile(strFile.getFileName());
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Wed Feb 24 18:13:05 2010
@@ -40,6 +40,7 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
@@ -87,6 +88,8 @@
private FuncSpec rightLoaderFuncSpec;
private String rightInputFileName;
+
+ private String indexFile;
// Buffer to hold accumulated left tuples.
private List<Tuple> leftTuples;
@@ -131,6 +134,7 @@
mTupleFactory = TupleFactory.getInstance();
leftTuples = new ArrayList<Tuple>(arrayListSize);
this.createJoinPlans(inpPlans,keyTypes);
+ this.indexFile = null;
}
/**
@@ -384,9 +388,15 @@
}
}
- @SuppressWarnings("unchecked")
private void seekInRightStream(Object firstLeftKey) throws IOException{
rightLoader = (IndexableLoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+
+ // check if hadoop distributed cache is used
+ if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) {
+ DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader;
+ loader.setIndexFile(indexFile);
+ }
+
// Pass signature of the loader to rightLoader
// make a copy of the conf to use in calls to rightLoader.
Configuration conf = new Configuration(PigMapReduce.sJobConf);
@@ -545,4 +555,12 @@
public void setSignature(String signature) {
this.signature = signature;
}
+
+ public void setIndexFile(String indexFile) {
+ this.indexFile = indexFile;
+ }
+
+ public String getIndexFile() {
+ return indexFile;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=915907&r1=915906&r2=915907&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Wed Feb 24 18:13:05 2010
@@ -21,10 +21,13 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadCaster;
@@ -37,6 +40,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
@@ -64,7 +68,7 @@
// Index is modeled as FIFO queue and LinkedList implements java Queue interface.
private LinkedList<Tuple> index;
private FuncSpec rightLoaderFuncSpec;
- private PigContext pc;
+
private String scope;
private Tuple dummyTuple = null;
private transient TupleFactory mTupleFactory;
@@ -105,14 +109,11 @@
// the join key
Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)));
- try {
- pc = (PigContext)ObjectSerializer.deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
- } catch (IOException e) {
- int errCode = 2094;
- String msg = "Unable to deserialize pig context.";
- throw new ExecException(msg,errCode,e);
- }
- pc.connect();
+
+ Properties props = new Properties();
+ props.setProperty(MapRedUtil.FILE_SYSTEM_NAME, "file:///");
+
+ PigContext pc = new PigContext(ExecType.LOCAL, props);
ld.setPc(pc);
index = new LinkedList<Tuple>();
for(Result res=ld.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNext(dummyTuple))
@@ -191,6 +192,8 @@
}
private void initRightLoader(int [] splitsToBeRead) throws IOException{
+ PigContext pc = (PigContext) ObjectSerializer
+ .deserialize(PigMapReduce.sJobConf.get("pig.pigContext"));
//create ReadToEndLoader that will read the given splits in order
loader = new ReadToEndLoader(
(LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
@@ -255,4 +258,8 @@
// nothing to do
}
+ public void setIndexFile(String indexFile) {
+ this.indexFile = indexFile;
+ }
+
}