You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/01/08 02:18:29 UTC
svn commit: r732581 [1/2] - in /hadoop/pig/branches/types: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/execu...
Author: gates
Date: Wed Jan 7 17:18:29 2009
New Revision: 732581
URL: http://svn.apache.org/viewvc?rev=732581&view=rev
Log:
PIG-554 Added fragment replicate map side join.
Added:
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestFRJoin.java
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/PigServer.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java
hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Wed Jan 7 17:18:29 2009
@@ -6,6 +6,8 @@
NEW FEATURES
+ PIG-554 Added fragment replicate map side join (shravanmn via pkamath and gates)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/PigServer.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/PigServer.java Wed Jan 7 17:18:29 2009
@@ -369,7 +369,7 @@
}
}
- public void dumpSchema(String alias) throws IOException{
+ public Schema dumpSchema(String alias) throws IOException{
try {
LogicalPlan lp = getPlanFromAlias(alias, "describe");
try {
@@ -380,6 +380,7 @@
Schema schema = lp.getLeaves().get(0).getSchema();
if (schema != null) System.out.println(alias + ": " + schema.toString());
else System.out.println("Schema for " + alias + " unknown.");
+ return schema;
} catch (FrontendException fe) {
throw WrappedIOException.wrap(
"Unable to describe schema for alias " + alias, fe);
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Wed Jan 7 17:18:29 2009
@@ -47,6 +47,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -778,7 +779,80 @@
}
}
-
+ /**
+ * This is an operator which will have multiple inputs(= to number of join inputs)
+ * But it prunes off all inputs but the fragment input and creates separate MR jobs
+ * for each of the replicated inputs and uses these as the replicated files that
+ * are configured in the POFRJoin operator. It also sets that this is FRJoin job
+ * and some parametes associated with it.
+ */
+ @Override
+ public void visitFRJoin(POFRJoin op) throws VisitorException {
+ try{
+ FileSpec[] replFiles = new FileSpec[op.getInputs().size()];
+ for (int i=0; i<replFiles.length; i++) {
+ if(i==op.getFragment()) continue;
+ replFiles[i] = getTempFileSpec();
+ }
+ op.setReplFiles(replFiles);
+
+ List<OperatorKey> opKeys = new ArrayList<OperatorKey>(op.getInputs().size());
+ for (PhysicalOperator pop : op.getInputs()) {
+ opKeys.add(pop.getOperatorKey());
+ }
+ int fragPlan = 0;
+ for(int i=0;i<compiledInputs.length;i++){
+ MapReduceOper mro = compiledInputs[i];
+ OperatorKey opKey = (!mro.isMapDone()) ? mro.mapPlan.getLeaves().get(0).getOperatorKey()
+ : mro.reducePlan.getLeaves().get(0).getOperatorKey();
+ if(opKeys.indexOf(opKey)==op.getFragment()){
+ curMROp = mro;
+ fragPlan = i;
+ continue;
+ }
+ POStore str = getStore();
+ str.setSFile(replFiles[opKeys.indexOf(opKey)]);
+ if (!mro.isMapDone()) {
+ mro.mapPlan.addAsLeaf(str);
+ mro.setMapDoneSingle(true);
+ } else if (mro.isMapDone() && !mro.isReduceDone()) {
+ mro.reducePlan.addAsLeaf(str);
+ mro.setReduceDone(true);
+ } else {
+ log.error("Both map and reduce phases have been done. This is unexpected while compiling!");
+ throw new PlanException("Both map and reduce phases have been done. This is unexpected while compiling!");
+ }
+ }
+ for(int i=0;i<compiledInputs.length;i++){
+ if(i==fragPlan) continue;
+ MRPlan.connect(compiledInputs[i], curMROp);
+ }
+
+ if (!curMROp.isMapDone()) {
+ curMROp.mapPlan.addAsLeaf(op);
+ } else if (curMROp.isMapDone() && !curMROp.isReduceDone()) {
+ curMROp.reducePlan.addAsLeaf(op);
+ } else {
+ log.error("Both map and reduce phases have been done. This is unexpected while compiling!");
+ throw new PlanException("Both map and reduce phases have been done. This is unexpected while compiling!");
+ }
+ List<List<PhysicalPlan>> joinPlans = op.getJoinPlans();
+ if(joinPlans!=null)
+ for (List<PhysicalPlan> joinPlan : joinPlans) {
+ if(joinPlan!=null)
+ for (PhysicalPlan plan : joinPlan) {
+ addUDFs(plan);
+ }
+ }
+ curMROp.setFrjoin(true);
+ curMROp.setFragment(op.getFragment());
+ curMROp.setReplFiles(op.getReplFiles());
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
@Override
public void visitDistinct(PODistinct op) throws VisitorException {
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Wed Jan 7 17:18:29 2009
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
@@ -97,6 +98,11 @@
private String scope;
+ //Fragment Replicate Join State
+ boolean frjoin = false;
+ FileSpec[] replFiles = null;
+ int fragment = -1;
+
int requestedParallelism = -1;
// Last POLimit value in this map reduce operator, needed by LimitAdjuster
@@ -293,4 +299,28 @@
public void setStreamInReduce(boolean streamInReduce) {
this.streamInReduce = streamInReduce;
}
+
+ public int getFragment() {
+ return fragment;
+ }
+
+ public void setFragment(int fragment) {
+ this.fragment = fragment;
+ }
+
+ public boolean isFrjoin() {
+ return frjoin;
+ }
+
+ public void setFrjoin(boolean frjoin) {
+ this.frjoin = frjoin;
+ }
+
+ public FileSpec[] getReplFiles() {
+ return replFiles;
+ }
+
+ public void setReplFiles(FileSpec[] replFiles) {
+ this.replFiles = replFiles;
+ }
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Wed Jan 7 17:18:29 2009
@@ -702,6 +702,71 @@
poPackage.setInner(cg.getInner());
LogToPhyMap.put(cg, poPackage);
}
+
+
+ /**
+ * Create the inner plans used to configure the Local Rearrange operators(ppLists)
+ * Extract the keytypes and create the POFRJoin operator.
+ */
+ @Override
+ protected void visit(LOFRJoin frj) throws VisitorException {
+ String scope = frj.getOperatorKey().scope;
+ List<LogicalOperator> inputs = frj.getInputs();
+ List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
+ List<Byte> keyTypes = new ArrayList<Byte>();
+
+ int fragment = findFrag(inputs,frj.getFragOp());
+ List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
+ for (LogicalOperator op : inputs) {
+ inp.add(LogToPhyMap.get(op));
+ List<LogicalPlan> plans = (List<LogicalPlan>) frj.getJoinColPlans()
+ .get(op);
+
+ List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+ currentPlans.push(currentPlan);
+ for (LogicalPlan lp : plans) {
+ currentPlan = new PhysicalPlan();
+ PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
+ .spawnChildWalker(lp);
+ pushWalker(childWalker);
+ mCurrentWalker.walk(this);
+ exprPlans.add((PhysicalPlan) currentPlan);
+ popWalker();
+
+ }
+ currentPlan = currentPlans.pop();
+ ppLists.add(exprPlans);
+
+ if (plans.size() > 1) {
+ keyTypes.add(DataType.TUPLE);
+ } else {
+ keyTypes.add(exprPlans.get(0).getLeaves().get(0).getResultType());
+ }
+ }
+ POFRJoin pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),frj.getRequestedParallelism(),
+ inp, ppLists, keyTypes, null, fragment);
+ pfrj.setResultType(DataType.TUPLE);
+ currentPlan.add(pfrj);
+ for (LogicalOperator op : inputs) {
+ try {
+ currentPlan.connect(LogToPhyMap.get(op), pfrj);
+ } catch (PlanException e) {
+ log.error("Invalid physical operators in the physical plan"
+ + e.getMessage());
+ throw new VisitorException(e);
+ }
+ }
+ LogToPhyMap.put(frj, pfrj);
+ }
+
+ private int findFrag(List<LogicalOperator> inputs, LogicalOperator fragOp) {
+ int i=-1;
+ for (LogicalOperator lop : inputs) {
+ if(fragOp.getOperatorKey().equals(lop.getOperatorKey()))
+ return ++i;
+ }
+ return -1;
+ }
@Override
public void visit(LOFilter filter) throws VisitorException {
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Wed Jan 7 17:18:29 2009
@@ -219,6 +219,10 @@
public void visitLimit(POLimit lim) throws VisitorException{
//do nothing
}
+
+ public void visitFRJoin(POFRJoin join) throws VisitorException {
+ //do nothing
+ }
/**
* @param stream
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Wed Jan 7 17:18:29 2009
@@ -150,6 +150,14 @@
else if(node instanceof POForEach){
sb.append(planString(((POForEach)node).getInputPlans()));
}
+ else if(node instanceof POFRJoin){
+ POFRJoin frj = (POFRJoin)node;
+ List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
+ if(joinPlans!=null)
+ for (List<PhysicalPlan> list : joinPlans) {
+ sb.append(planString(list));
+ }
+ }
List<O> originalPredecessors = mPlan.getPredecessors(node);
if (originalPredecessors == null)
Added: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=732581&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Wed Jan 7 17:18:29 2009
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+
+/**
+ * The operator models the join keys using the Local Rearrange operators which
+ * are configured with the plan specified by the user. It also sets up
+ * one Hashtable per replicated input which maps the Key(k) stored as a Tuple
+ * to a DataBag which holds all the values in the input having the same key(k)
+ * The getNext() reads an input from its predecessor and separates them into
+ * key & value. It configures a foreach operator with the databags obtained from
+ * each Hashtable for the key and also with the value for the fragment input.
+ * It then returns tuples returned by this foreach operator.
+ */
+public class POFRJoin extends PhysicalOperator {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 1L;
+ private Log log = LogFactory.getLog(getClass());
+ //The number in the input list which denotes the fragmented input
+ private int fragment;
+ //There can be n inputs each being a List<PhysicalPlan>
+ //Ex. join A by ($0+$1,$0-$1), B by ($0*$1,$0/$1);
+ private List<List<PhysicalPlan>> phyPlanLists;
+ //The key type for each Local Rearrange operator
+ private List<Byte> keyTypes;
+ //The Local Rearrange operators modeling the join key
+ private POLocalRearrange[] LRs;
+ //The set of files that represent the replicated inputs
+ private FileSpec[] replFiles;
+ //Used to configure the foreach operator
+ private ConstantExpression[] constExps;
+ //Used to produce the cross product of various bags
+ private POForEach fe;
+ //The array of Hashtables one per replicated input. replicates[fragment] = null
+ private Map<Tuple,List<Tuple>> replicates[];
+ //varaible which denotes whether we are returning tuples from the foreach operator
+ private boolean processingPlan;
+ //A dummy tuple
+ private Tuple dumTup = TupleFactory.getInstance().newTuple(1);
+ //An instance of tuple factory
+ private transient TupleFactory mTupleFactory;
+ private transient BagFactory mBagFactory;
+ private boolean setUp;
+
+ public POFRJoin(OperatorKey k) throws PlanException {
+ this(k,-1,null, null, null, null, -1);
+ }
+
+ public POFRJoin(OperatorKey k, int rp) throws PlanException {
+ this(k, rp, null, null, null, null, -1);
+ }
+
+ public POFRJoin(OperatorKey k, List<PhysicalOperator> inp) throws PlanException {
+ this(k, -1, inp, null, null, null, -1);
+ }
+
+ public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) throws PlanException {
+ this(k,rp,inp,null, null, null, -1);
+ }
+
+ public POFRJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, List<List<PhysicalPlan>> ppLists, List<Byte> keyTypes, FileSpec[] replFiles, int fragment){
+ super(k,rp,inp);
+
+ phyPlanLists = ppLists;
+ this.fragment = fragment;
+ this.keyTypes = keyTypes;
+ this.replFiles = replFiles;
+ replicates = new Map[ppLists.size()];
+ LRs = new POLocalRearrange[ppLists.size()];
+ constExps = new ConstantExpression[ppLists.size()];
+ createJoinPlans(k);
+ processingPlan = false;
+ mTupleFactory = TupleFactory.getInstance();
+ mBagFactory = BagFactory.getInstance();
+ }
+
+ public List<List<PhysicalPlan>> getJoinPlans(){
+ return phyPlanLists;
+ }
+
+ private OperatorKey genKey(OperatorKey old){
+ return new OperatorKey(old.scope,NodeIdGenerator.getGenerator().getNextNodeId(old.scope));
+ }
+
+ /**
+ * Configures the Local Rearrange operators & the foreach operator
+ * @param old
+ */
+ private void createJoinPlans(OperatorKey old){
+ List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+ List<Boolean> flatList = new ArrayList<Boolean>();
+
+ int i=-1;
+ for (List<PhysicalPlan> ppLst : phyPlanLists) {
+ ++i;
+ POLocalRearrange lr = new POLocalRearrange(genKey(old));
+ lr.setIndex(i);
+ lr.setResultType(DataType.TUPLE);
+ lr.setKeyType(keyTypes.get(i));
+ lr.setPlans(ppLst);
+ LRs[i]= lr;
+ ConstantExpression ce = new ConstantExpression(genKey(old));
+ ce.setResultType((i==fragment)?DataType.TUPLE:DataType.BAG);
+ constExps[i] = ce;
+ PhysicalPlan pp = new PhysicalPlan();
+ pp.add(ce);
+ fePlans.add(pp);
+ flatList.add(true);
+ }
+ fe = new POForEach(genKey(old),-1,fePlans,flatList);
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitFRJoin(this);
+ }
+
+ @Override
+ public String name() {
+ return "FRJoin[" + DataType.findTypeName(resultType) + "]" +" - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = null;
+ Result inp = null;
+ if(!setUp){
+ setUpHashMap();
+ setUp = true;
+ }
+ if(processingPlan){
+ //Return tuples from the for each operator
+ //Assumes that it is configured appropriately with
+ //the bags for the current key.
+ while(true) {
+ res = fe.getNext(dummyTuple);
+
+ if(res.returnStatus==POStatus.STATUS_OK){
+ return res;
+ }
+ if(res.returnStatus==POStatus.STATUS_EOP){
+ processingPlan = false;
+ break;
+ }
+ if(res.returnStatus==POStatus.STATUS_ERR) {
+ return res;
+ }
+ if(res.returnStatus==POStatus.STATUS_NULL) {
+ continue;
+ }
+ }
+ }
+ while (true) {
+ //Process the current input
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP
+ || inp.returnStatus == POStatus.STATUS_ERR)
+ return inp;
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+
+ //Separate Key & Value using the fragment's LR operator
+ POLocalRearrange lr = LRs[fragment];
+ lr.attachInput((Tuple)inp.result);
+ Result lrOut = lr.getNext(dummyTuple);
+ if(lrOut.returnStatus!=POStatus.STATUS_OK) {
+ log.error("LocalRearrange isn't configured right or is not working");
+ return new Result();
+ }
+ Tuple lrOutTuple = (Tuple) lrOut.result;
+ Tuple key = TupleFactory.getInstance().newTuple(1);
+ key.set(0,lrOutTuple.get(1));
+ Tuple value = getValueTuple(lr, lrOutTuple);
+
+ //Configure the for each operator with the relevant bags
+ int i=-1;
+ boolean noMatch = false;
+ for (ConstantExpression ce : constExps) {
+ ++i;
+ if(i==fragment){
+ ce.setValue(value);
+ continue;
+ }
+ Map<Tuple, List<Tuple>> replicate = replicates[i];
+ if(!replicate.containsKey(key)){
+ noMatch = true;
+ break;
+ }
+ ce.setValue(mBagFactory.newDefaultBag(replicate.get(key)));
+ }
+ if(noMatch)
+ continue;
+ fe.attachInput(dumTup);
+ processingPlan = true;
+
+ Result gn = getNext(dummyTuple);
+ return gn;
+ }
+ }
+
+ /**
+ * Builds the HashMaps by reading each replicated input from the DFS
+ * using a Load operator
+ * @throws ExecException
+ */
+ private void setUpHashMap() throws ExecException {
+ int i=-1;
+ long time1 = System.currentTimeMillis();
+ for (FileSpec replFile : replFiles) {
+ ++i;
+ if(i==fragment){
+ replicates[i] = null;
+ continue;
+ }
+
+ POLoad ld = new POLoad(new OperatorKey("Repl File Loader", 1L), replFile, false);
+ PigContext pc = new PigContext(ExecType.MAPREDUCE,ConfigurationUtil.toProperties(PigMapReduce.sJobConf));
+ pc.connect();
+ ld.setPc(pc);
+ POLocalRearrange lr = LRs[i];
+ lr.setInputs(Arrays.asList((PhysicalOperator)ld));
+ Map<Tuple, List<Tuple>> replicate = new HashMap<Tuple, List<Tuple>>(1000);
+ log.debug("Completed setup. Trying to build replication hash table");
+ int cnt = 0;
+ for(Result res=lr.getNext(dummyTuple);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(dummyTuple)){
+ ++cnt;
+ if(reporter!=null) reporter.progress();
+ Tuple tuple = (Tuple) res.result;
+ Tuple key = mTupleFactory.newTuple(1);
+ key.set(0,tuple.get(1));
+ Tuple value = getValueTuple(lr, tuple);
+ if(!replicate.containsKey(key))
+ replicate.put(key, new ArrayList<Tuple>());
+ replicate.get(key).add(value);
+ }
+ replicates[i] = replicate;
+
+ }
+ long time2 = System.currentTimeMillis();
+ log.debug("Hash Table built. Time taken: " + (time2-time1));
+ }
+
+ private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{
+ is.defaultReadObject();
+ mTupleFactory = TupleFactory.getInstance();
+ mBagFactory = BagFactory.getInstance();
+// setUpHashTable();
+ }
+
+ /*
+ * Extracts the value tuple from the LR operator's output tuple
+ */
+ private Tuple getValueTuple(POLocalRearrange lr, Tuple tuple) throws ExecException {
+ Tuple val = (Tuple) tuple.get(2);
+ Tuple retTup = null;
+ boolean isProjectStar = lr.isProjectStar();
+ Map<Integer, Integer> keyLookup = lr.getProjectedColsMap();
+ int keyLookupSize = keyLookup.size();
+ Object key = tuple.get(1);
+ boolean isKeyTuple = lr.isKeyTuple();
+ Tuple keyAsTuple = isKeyTuple ? (Tuple)tuple.get(1) : null;
+ if( keyLookupSize > 0) {
+
+ // we have some fields of the "value" in the
+ // "key".
+ retTup = mTupleFactory.newTuple();
+ int finalValueSize = keyLookupSize + val.size();
+ int valIndex = 0; // an index for accessing elements from
+ // the value (val) that we have currently
+ for(int i = 0; i < finalValueSize; i++) {
+ Integer keyIndex = keyLookup.get(i);
+ if(keyIndex == null) {
+ // the field for this index is not in the
+ // key - so just take it from the "value"
+ // we were handed
+ retTup.append(val.get(valIndex));
+ valIndex++;
+ } else {
+ // the field for this index is in the key
+ if(isKeyTuple) {
+ // the key is a tuple, extract the
+ // field out of the tuple
+ retTup.append(keyAsTuple.get(keyIndex));
+ } else {
+ retTup.append(key);
+ }
+ }
+ }
+
+ } else if (isProjectStar) {
+
+ // the whole "value" is present in the "key"
+ retTup = mTupleFactory.newTuple(keyAsTuple.getAll());
+
+ } else {
+
+ // there is no field of the "value" in the
+ // "key" - so just make a copy of what we got
+ // as the "value"
+ retTup = mTupleFactory.newTuple(val.getAll());
+
+ }
+ return retTup;
+ }
+
+ public int getFragment() {
+ return fragment;
+ }
+
+ public void setFragment(int fragment) {
+ this.fragment = fragment;
+ }
+
+ public FileSpec[] getReplFiles() {
+ return replFiles;
+ }
+
+ public void setReplFiles(FileSpec[] replFiles) {
+ this.replFiles = replFiles;
+ }
+}
Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Wed Jan 7 17:18:29 2009
@@ -77,6 +77,7 @@
public POLoad(OperatorKey k, int rp, FileSpec lFile,boolean splittable) {
super(k, rp);
+ this.lFile = lFile;
this.splittable = splittable;
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/BagFactory.java Wed Jan 7 17:18:29 2009
@@ -22,6 +22,7 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.Comparator;
+import java.util.List;
import org.apache.pig.impl.util.SpillableMemoryManager;
@@ -84,6 +85,12 @@
public abstract DataBag newDefaultBag();
/**
+ * Get a default (unordered, not distinct) data bag from
+ * an existing list of tuples.
+ */
+ public abstract DataBag newDefaultBag(List<Tuple> listOfTuples);
+
+ /**
* Get a sorted data bag.
* @param comp Comparator that controls how the data is sorted.
* If null, default comparator will be used.
Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultBagFactory.java Wed Jan 7 17:18:29 2009
@@ -18,6 +18,7 @@
package org.apache.pig.data;
import java.util.Comparator;
+import java.util.List;
import org.apache.pig.impl.util.SpillableMemoryManager;
@@ -33,6 +34,18 @@
registerBag(b);
return b;
}
+
+ /**
+ * Get a default (unordered, not distinct) data bag from
+ * an existing list of tuples. Note that the bag does NOT
+ * copy the tuples but uses the provided list as its backing store.
+ * So it takes ownership of the list.
+ */
+ public DataBag newDefaultBag(List<Tuple> listOfTuples) {
+ DataBag b = new DefaultDataBag(listOfTuples);
+ registerBag(b);
+ return b;
+ }
/**
* Get a sorted data bag.
Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DefaultDataBag.java Wed Jan 7 17:18:29 2009
@@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.io.FileNotFoundException;
import org.apache.commons.logging.Log;
@@ -51,6 +52,16 @@
mContents = new ArrayList<Tuple>();
}
+ /**
+ * This constructor creates a bag out of an existing list
+ * of tuples by taking ownership of the list and NOT
+ * copying the contents of the list.
+ * @param listOfTuples List<Tuple> containing the tuples
+ */
+ public DefaultDataBag(List<Tuple> listOfTuples) {
+ mContents = listOfTuples;
+ }
+
public boolean isSorted() {
return false;
}
Added: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java?rev=732581&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java Wed Jan 7 17:18:29 2009
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
+
+/**
+ * This is the logical operator for the Fragment Replicate Join
+ * It holds the user specified information and is responsible for
+ * the schema computation. This mimics the LOCogroup operator except
+ * the schema computation.
+ */
+public class LOFRJoin extends LogicalOperator {
+ private static final long serialVersionUID = 2L;
+
+// private boolean[] mIsInner;
+ private static Log log = LogFactory.getLog(LOFRJoin.class);
+ private MultiMap<LogicalOperator, LogicalPlan> mJoinColPlans;
+ private LogicalOperator fragOp;
+
+ public LOFRJoin(
+ LogicalPlan plan,
+ OperatorKey k,
+ MultiMap<LogicalOperator, LogicalPlan> joinColPlans,
+ boolean[] isInner, LogicalOperator fragOp) {
+ super(plan, k);
+ mJoinColPlans = joinColPlans;
+// mIsInner = isInner;
+ this.fragOp = fragOp;
+ }
+
+ @Override
+ /**
+ * Uses the schema from its input operators and dedups
+ * those fields that have the same alias and sets the
+ * schema for the join
+ */
+ public Schema getSchema() throws FrontendException {
+ List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+ mType = DataType.BAG;//mType is from the super class
+ Hashtable<String, Integer> nonDuplicates = new Hashtable<String, Integer>();
+ if(!mIsSchemaComputed){
+ List<Schema.FieldSchema> fss = new ArrayList<Schema.FieldSchema>();
+ int i=-1;
+ for (LogicalOperator op : inputs) {
+ try {
+ Schema cSchema = op.getSchema();
+ if(cSchema!=null){
+
+ for (FieldSchema schema : cSchema.getFields()) {
+ ++i;
+ if(nonDuplicates.containsKey(schema.alias))
+ {
+ if(nonDuplicates.get(schema.alias)!=-1) {
+ nonDuplicates.remove(schema.alias);
+ nonDuplicates.put(schema.alias, -1);
+ }
+ }
+ else
+ nonDuplicates.put(schema.alias, i);
+ FieldSchema newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+ newFS.setParent(schema.canonicalName, op);
+ fss.add(newFS);
+ }
+ }
+ else
+ fss.add(new FieldSchema(null,DataType.BYTEARRAY));
+ } catch (FrontendException ioe) {
+ mIsSchemaComputed = false;
+ mSchema = null;
+ throw ioe;
+ }
+ }
+ mIsSchemaComputed = true;
+ for (Entry<String, Integer> ent : nonDuplicates.entrySet()) {
+ int ind = ent.getValue();
+ if(ind==-1) continue;
+ FieldSchema prevSch = fss.get(ind);
+ fss.set(ind, new FieldSchema(ent.getKey(),prevSch.schema,prevSch.type));
+ }
+ mSchema = new Schema(fss);
+ }
+ return mSchema;
+ }
+
+ public MultiMap<LogicalOperator, LogicalPlan> getJoinColPlans() {
+ return mJoinColPlans;
+ }
+
+ public void switchJoinColPlanOp(LogicalOperator oldOp,
+ LogicalOperator newOp) {
+ Collection<LogicalPlan> innerPlans = mJoinColPlans.removeKey(oldOp) ;
+ mJoinColPlans.put(newOp, innerPlans);
+ if(fragOp.getOperatorKey().equals(oldOp.getOperatorKey()))
+ fragOp = newOp;
+ }
+
+ public void unsetSchema() throws VisitorException{
+ for(LogicalOperator input: getInputs()) {
+ Collection<LogicalPlan> grpPlans = mJoinColPlans.get(input);
+ if(grpPlans!=null)
+ for(LogicalPlan plan : grpPlans) {
+ SchemaRemover sr = new SchemaRemover(plan);
+ sr.visit();
+ }
+ }
+ super.unsetSchema();
+ }
+
+ public List<LogicalOperator> getInputs() {
+ return mPlan.getPredecessors(this);
+ }
+
+ @Override
+ public void visit(LOVisitor v) throws VisitorException {
+ v.visit(this);
+ }
+
+ @Override
+ public String name() {
+ return "FRJoin " + mKey.scope + "-" + mKey.id;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return true;
+ }
+
+ public LogicalOperator getFragOp() {
+ return fragOp;
+ }
+
+ public void setFragOp(LogicalOperator fragOp) {
+ this.fragOp = fragOp;
+ }
+
+ public boolean isTupleJoinCol() {
+ List<LogicalOperator> inputs = mPlan.getPredecessors(this);
+ if (inputs == null || inputs.size() == 0) {
+ throw new AssertionError("join.isTuplejoinCol() can be called "
+ + "after it has an input only") ;
+ }
+ // NOTE: we depend on the number of inner plans to determine
+ // if the join col is a tuple. This could be an issue when there
+ // is only one inner plan with Project(*). For that case if the
+ // corresponding input to the Project had a schema then the front end
+ // would translate the single Project(*) (through ProjectStarTranslator)
+ // to many individual Projects. So the number of inner plans would then
+ // be > 1 BEFORE reaching here. For the Project(*) case when the corresponding
+ // input for the Project has no schema, treating it as an atomic col join
+ // does not cause any problems since no casts need to be inserted in that case
+ // anyway.
+ return mJoinColPlans.get(inputs.get(0)).size() > 1 ;
+ }
+ public byte getAtomicJoinColType() throws FrontendException {
+ if (isTupleJoinCol()) {
+ throw new FrontendException("getAtomicjoinByType is used only when"
+ + " dealing with atomic join col") ;
+ }
+
+ byte joinColType = DataType.BYTEARRAY ;
+ // merge all the inner plan outputs so we know what type
+ // our join column should be
+ for(int i=0;i < getInputs().size(); i++) {
+ LogicalOperator input = getInputs().get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(getJoinColPlans().get(input)) ;
+ if (innerPlans.size() != 1) {
+ throw new FrontendException("Each join input has to have "
+ + "the same number of inner plans") ;
+ }
+ byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+ joinColType = DataType.mergeType(joinColType, innerType) ;
+ }
+
+ return joinColType ;
+ }
+
+ public Schema getTupleJoinColSchema() throws FrontendException {
+ if (!isTupleJoinCol()) {
+ throw new FrontendException("getTupleJoinColSchema is used only when"
+ + " dealing with tuple join col") ;
+ }
+
+ // this fsList represents all the columns in join tuple
+ List<Schema.FieldSchema> fsList = new ArrayList<Schema.FieldSchema>() ;
+
+ int outputSchemaSize = getJoinColPlans().get(getInputs().get(0)).size() ;
+
+ // by default, they are all bytearray
+ // for type checking, we don't care about aliases
+ for(int i=0; i<outputSchemaSize; i++) {
+ fsList.add(new Schema.FieldSchema(null, DataType.BYTEARRAY)) ;
+ }
+
+ // merge all the inner plan outputs so we know what type
+ // our join column should be
+ for(int i=0;i < getInputs().size(); i++) {
+ LogicalOperator input = getInputs().get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(getJoinColPlans().get(input)) ;
+
+ boolean seenProjectStar = false;
+ for(int j=0;j < innerPlans.size(); j++) {
+ byte innerType = innerPlans.get(j).getSingleLeafPlanOutputType() ;
+ ExpressionOperator eOp = (ExpressionOperator)innerPlans.get(j).getSingleLeafPlanOutputOp();
+
+ if(eOp instanceof LOProject) {
+ if(((LOProject)eOp).isStar()) {
+ seenProjectStar = true;
+ }
+ }
+
+ Schema.FieldSchema joinFs = fsList.get(j);
+ joinFs.type = DataType.mergeType(joinFs.type, innerType) ;
+ Schema.FieldSchema fs = eOp.getFieldSchema();
+ if(null != fs) {
+ joinFs.setParent(eOp.getFieldSchema().canonicalName, eOp);
+ } else {
+ joinFs.setParent(null, eOp);
+ }
+ }
+
+ if(seenProjectStar && innerPlans.size() > 1) {
+ throw new FrontendException("joining attributes can either be star (*) or a list of expressions, but not both.");
+
+ }
+
+ }
+
+ return new Schema(fsList) ;
+ }
+
+
+}
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOPrinter.java Wed Jan 7 17:18:29 2009
@@ -146,6 +146,15 @@
}
}
}
+ else if(node instanceof LOFRJoin){
+ MultiMap<LogicalOperator, LogicalPlan> plans = ((LOFRJoin)node).getJoinColPlans();
+ for (LogicalOperator lo : plans.keySet()) {
+ // Visit the associated plans
+ for (LogicalPlan plan : plans.get(lo)) {
+ sb.append(planString(plan));
+ }
+ }
+ }
else if(node instanceof LOSort){
sb.append(planString(((LOSort)node).getSortColPlans()));
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Wed Jan 7 17:18:29 2009
@@ -135,6 +135,24 @@
}
}
}
+
+ protected void visit(LOFRJoin frj) throws VisitorException {
+ // Visit each of the inputs of cogroup.
+ MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = frj.getJoinColPlans();
+ for(LogicalOperator op: frj.getInputs()) {
+ for(LogicalPlan lp: mapGByPlans.get(op)) {
+ if (null != lp) {
+ // TODO FIX - How do we know this should be a
+ // DependencyOrderWalker? We should be replicating the
+ // walker the current visitor is using.
+ PlanWalker w = new DependencyOrderWalker(lp);
+ pushWalker(w);
+ w.walk(this);
+ popWalker();
+ }
+ }
+ }
+ }
/**
*
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java Wed Jan 7 17:18:29 2009
@@ -57,6 +57,11 @@
op.setPlan(mCurrentWalker.getPlan());
super.visit(op);
}
+
+ public void visit(LOFRJoin op) throws VisitorException {
+ op.setPlan(mCurrentWalker.getPlan());
+ super.visit(op);
+ }
public void visit(LOConst op) throws VisitorException {
op.setPlan(mCurrentWalker.getPlan());
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ProjectStarTranslator.java Wed Jan 7 17:18:29 2009
@@ -74,6 +74,32 @@
mapGByPlans.put(op, newGByPlans);
}
}
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOFRJoin)
+ */
+ @Override
+ protected void visit(LOFRJoin frj) throws VisitorException {
+ //get the attributes of LOFRJoin that are modified during the translation
+
+ MultiMap<LogicalOperator, LogicalPlan> joinColPlans = frj.getJoinColPlans();
+
+ for(LogicalOperator op: frj.getInputs()) {
+ ArrayList<LogicalPlan> newPlansAfterTranslation = new ArrayList<LogicalPlan>();
+ for(LogicalPlan lp: joinColPlans.get(op)) {
+ if (checkPlanForProjectStar(lp)) {
+ ArrayList<LogicalPlan> translatedPlans = translateProjectStarInPlan(lp);
+ for(int j = 0; j < translatedPlans.size(); ++j) {
+ newPlansAfterTranslation.add(translatedPlans.get(j));
+ }
+ } else {
+ newPlansAfterTranslation.add(lp);
+ }
+ }
+ joinColPlans.removeKey(op);
+ joinColPlans.put(op, newPlansAfterTranslation);
+ }
+ }
/**
*
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java Wed Jan 7 17:18:29 2009
@@ -60,6 +60,7 @@
NodeIdGenerator idGen = NodeIdGenerator.getGenerator();
LOSplit splitOp = new LOSplit(mPlan, new OperatorKey(scope,
idGen.getNextNodeId(scope)), new ArrayList<LogicalOperator>());
+ splitOp.setAlias(nodes.get(0).getAlias());
try {
mPlan.add(splitOp);
@@ -136,6 +137,7 @@
splitOp.addOutput(splitOutput);
mPlan.add(splitOutput);
mPlan.insertBetween(splitOp, splitOutput, succ);
+ splitOutput.setAlias(splitOp.getAlias());
// Patch up the contained plans of succ
fixUpContainedPlans(nodes.get(0), splitOutput, succ, null);
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalTransformer.java Wed Jan 7 17:18:29 2009
@@ -30,6 +30,7 @@
import org.apache.pig.impl.plan.optimizer.Transformer;
import org.apache.pig.impl.plan.optimizer.Transformer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LOCogroup;
@@ -182,6 +183,10 @@
LOCogroup cg = (LOCogroup) before ;
cg.switchGroupByPlanOp(after, newNode);
}
+ if (before instanceof LOFRJoin) {
+ LOFRJoin frj = (LOFRJoin) before ;
+ frj.switchJoinColPlanOp(after, newNode);
+ }
// Visit all the inner plans of before and change their projects to
// connect to newNode instead of after.
@@ -189,7 +194,10 @@
List<LogicalPlan> plans = new ArrayList<LogicalPlan>();
if (before instanceof LOCogroup) {
plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
- } else if (before instanceof LOSort) {
+ } else if (before instanceof LOFRJoin) {
+ plans.addAll((((LOFRJoin)before).getJoinColPlans()).values());
+ }
+ else if (before instanceof LOSort) {
plans.addAll(((LOSort)before).getSortColPlans());
} else if (before instanceof LOFilter) {
plans.add(((LOFilter)before).getComparisonPlan());
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Wed Jan 7 17:18:29 2009
@@ -204,5 +204,11 @@
project.unsetFieldSchema();
super.visit(project);
}
+
+ @Override
+ protected void visit(LOFRJoin frj) throws VisitorException {
+ frj.unsetSchema();
+ super.visit(frj);
+ }
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Wed Jan 7 17:18:29 2009
@@ -189,7 +189,7 @@
// position that has a type other than byte array.
LOForEach foreach = new LOForEach(mPlan,
OperatorKey.genOpKey(scope), genPlans, flattens);
-
+ foreach.setAlias(lo.getAlias());
// Insert the foreach into the plan and patch up the plan.
insertAfter(lo, foreach, null);
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Wed Jan 7 17:18:29 2009
@@ -221,6 +221,59 @@
log.trace("Exiting parseCogroup");
return cogroup;
}
+
+ /**
+ * Mimicing parseCogroup as the parsing logic for FRJoin remains exactly the same.
+ */
+ LogicalOperator parseFRJoin(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
+
+ log.trace("Entering parseCogroup");
+ log.debug("LogicalPlan: " + lp);
+
+ int n = gis.size();
+ log.debug("Number of cogroup inputs = " + n);
+
+ ArrayList<LogicalOperator> los = new ArrayList<LogicalOperator>();
+ ArrayList<ArrayList<LogicalPlan>> plans = new ArrayList<ArrayList<LogicalPlan>>();
+ MultiMap<LogicalOperator, LogicalPlan> groupByPlans = new MultiMap<LogicalOperator, LogicalPlan>();
+ //Map<LogicalOperator, LogicalPlan> groupByPlans = new HashMap<LogicalOperator, LogicalPlan>();
+ boolean[] isInner = new boolean[n];
+
+ int arity = gis.get(0).plans.size();
+
+ for (int i = 0; i < n ; i++){
+
+ CogroupInput gi = gis.get(i);
+ los.add(gi.op);
+ ArrayList<LogicalPlan> planList = gi.plans;
+ plans.add(gi.plans);
+ int numGrpByOps = planList.size();
+ log.debug("Number of group by operators = " + numGrpByOps);
+
+ if(arity != numGrpByOps) {
+ throw new ParseException("The arity of the group by columns do not match.");
+ }
+ for(int j = 0; j < numGrpByOps; ++j) {
+ groupByPlans.put(gi.op, planList.get(j));
+ for(LogicalOperator root: planList.get(j).getRoots()) {
+ log.debug("Cogroup input plan root: " + root);
+ }
+ }
+ isInner[i] = gi.isInner;
+ }
+
+ LogicalOperator frj = new LOFRJoin(lp, new OperatorKey(scope, getNextId()), groupByPlans, isInner, gis.get(0).op);
+ lp.add(frj);
+ log.debug("Added operator " + frj.getClass().getName() + " object " + frj + " to the logical plan " + lp);
+
+ for(LogicalOperator op: los) {
+ lp.connect(op, frj);
+ log.debug("Connected operator " + op.getClass().getName() + " to " + frj.getClass().getName() + " in the logical plan");
+ }
+
+ log.trace("Exiting parseFRJoin");
+ return frj;
+ }
/**
* The join operator is translated to foreach
@@ -1598,11 +1651,14 @@
ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>();
log.trace("Entering JoinClause");
log.debug("LogicalPlan: " + lp);
+ LogicalOperator frj = null;
}
{
(gi = GroupItem(lp) { gis.add(gi); }
- ("," gi = GroupItem(lp) { gis.add(gi); })+)
- {log.trace("Exiting JoinClause"); return rewriteJoin(gis, lp);}
+ ("," gi = GroupItem(lp) { gis.add(gi); })+
+ // The addition of using replicated to indicate FRJoin
+ [<USING> ("\"replicated\"" | "\"repl\"") { frj = parseFRJoin(gis, lp); }] )
+ {log.trace("Exiting JoinClause"); return (frj==null) ? rewriteJoin(gis, lp) : frj;}
}
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/validators/TypeCheckingVisitor.java Wed Jan 7 17:18:29 2009
@@ -2092,6 +2092,9 @@
// We may have to compute the schema of the input again
// because we have just inserted
if (insertedOp != null) {
+ if(insertedOp.getAlias()==null){
+ insertedOp.setAlias(inputs.get(i).getAlias());
+ }
try {
this.visit(insertedOp);
}
@@ -2328,6 +2331,135 @@
throw vse ;
}
}
+
+ /**
+ * Mimics the type checking of LOCogroup
+ */
+ protected void visit(LOFRJoin frj) throws VisitorException {
+ try {
+ frj.regenerateSchema();
+ } catch (FrontendException fe) {
+ String msg = "Cannot resolve COGroup output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg, fe) ;
+ throw vse ;
+ }
+ MultiMap<LogicalOperator, LogicalPlan> joinColPlans
+ = frj.getJoinColPlans() ;
+ List<LogicalOperator> inputs = frj.getInputs() ;
+
+ // Type checking internal plans.
+ for(int i=0;i < inputs.size(); i++) {
+ LogicalOperator input = inputs.get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+
+ for(int j=0; j < innerPlans.size(); j++) {
+
+ LogicalPlan innerPlan = innerPlans.get(j) ;
+
+ // Check that the inner plan has only 1 output port
+ if (!innerPlan.isSingleLeafPlan()) {
+ String msg = "COGroup's inner plans can only"
+ + "have one output (leaf)" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ throw new VisitorException(msg) ;
+ }
+
+ checkInnerPlan(innerPlans.get(j)) ;
+ }
+ }
+
+ try {
+
+ if (!frj.isTupleJoinCol()) {
+ // merge all the inner plan outputs so we know what type
+ // our group column should be
+
+ // TODO: Don't recompute schema here
+ //byte groupType = schema.getField(0).type ;
+ byte groupType = frj.getAtomicJoinColType() ;
+
+ // go through all inputs again to add cast if necessary
+ for(int i=0;i < inputs.size(); i++) {
+ LogicalOperator input = inputs.get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+ // Checking innerPlan size already done above
+ byte innerType = innerPlans.get(0).getSingleLeafPlanOutputType() ;
+ if (innerType != groupType) {
+ insertAtomicCastForFRJInnerPlan(innerPlans.get(0),
+ frj,
+ groupType) ;
+ }
+ }
+ }
+ else {
+
+ // TODO: Don't recompute schema here
+ //Schema groupBySchema = schema.getField(0).schema ;
+ Schema groupBySchema = frj.getTupleJoinColSchema() ;
+
+ // go through all inputs again to add cast if necessary
+ for(int i=0;i < inputs.size(); i++) {
+ LogicalOperator input = inputs.get(i) ;
+ List<LogicalPlan> innerPlans
+ = new ArrayList<LogicalPlan>(joinColPlans.get(input)) ;
+ for(int j=0;j < innerPlans.size(); j++) {
+ LogicalPlan innerPlan = innerPlans.get(j) ;
+ byte innerType = innerPlan.getSingleLeafPlanOutputType() ;
+ byte expectedType = DataType.BYTEARRAY ;
+
+ if (!DataType.isAtomic(innerType) && (DataType.TUPLE != innerType)) {
+ String msg = "Sorry, group by complex types"
+ + " will be supported soon" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ throw vse ;
+ }
+
+ try {
+ expectedType = groupBySchema.getField(j).type ;
+ }
+ catch(ParseException pe) {
+ String msg = "Cannot resolve COGroup output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(pe) ;
+ throw vse ;
+ }
+
+ if (innerType != expectedType) {
+ insertAtomicCastForFRJInnerPlan(innerPlan,
+ frj,
+ expectedType) ;
+ }
+ }
+ }
+ }
+ }
+ catch (FrontendException fe) {
+ String msg = "Cannot resolve COGroup output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg, fe) ;
+ throw vse ;
+ }
+
+ // TODO: Don't recompute schema here. Remove all from here!
+ // Generate output schema based on the schema generated from
+ // COGroup itself
+
+ try {
+ Schema outputSchema = frj.regenerateSchema() ;
+ }
+ catch (FrontendException fe) {
+ String msg = "Cannot resolve COGroup output schema" ;
+ msgCollector.collect(msg, MessageType.Error) ;
+ VisitorException vse = new VisitorException(msg) ;
+ vse.initCause(fe) ;
+ throw vse ;
+ }
+ }
/**
* COGroup
@@ -2460,7 +2592,35 @@
throw vse ;
}
}
+
+ private void insertAtomicCastForFRJInnerPlan(LogicalPlan innerPlan,
+ LOFRJoin frj, byte toType) throws VisitorException {
+ if (!DataType.isUsableType(toType)) {
+ throw new AssertionError("Cannot cast to type "
+ + DataType.findTypeName(toType));
+ }
+ List<LogicalOperator> leaves = innerPlan.getLeaves();
+ if (leaves.size() > 1) {
+ throw new AssertionError(
+ "insertAtomicForCOGroupInnerPlan cannot be"
+ + " used when there is more than 1 output port");
+ }
+ ExpressionOperator currentOutput = (ExpressionOperator) leaves.get(0);
+ collectCastWarning(frj, currentOutput.getType(), toType);
+ OperatorKey newKey = genNewOperatorKey(currentOutput);
+ LOCast cast = new LOCast(innerPlan, newKey, currentOutput, toType);
+ innerPlan.add(cast);
+ try {
+ innerPlan.connect(currentOutput, cast);
+ } catch (PlanException ioe) {
+ AssertionError err = new AssertionError(
+ "Explicit casting insertion");
+ err.initCause(ioe);
+ throw err;
+ }
+ this.visit(cast);
+ }
// This helps insert casting to atomic types in COGroup's inner plans
// as a new leave of the plan
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java?rev=732581&r1=732580&r2=732581&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/NonDefaultBagFactory.java Wed Jan 7 17:18:29 2009
@@ -18,6 +18,7 @@
package org.apache.pig.test;
import java.util.Comparator;
+import java.util.List;
import org.apache.pig.data.*;
@@ -25,6 +26,13 @@
// default bag factory.
public class NonDefaultBagFactory extends BagFactory {
public DataBag newDefaultBag() { return null; }
+ /* (non-Javadoc)
+ * @see org.apache.pig.data.BagFactory#newDefaultBag(java.util.List)
+ */
+ @Override
+ public DataBag newDefaultBag(List<Tuple> listOfTuples) {
+ return null;
+ }
public DataBag newSortedBag(Comparator<Tuple> comp) { return null; }
public DataBag newDistinctBag() { return null; }