You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/02 21:20:06 UTC
svn commit: r832086 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...
Author: pradeepkth
Date: Mon Nov 2 20:20:05 2009
New Revision: 832086
URL: http://svn.apache.org/viewvc?rev=832086&view=rev
Log:
PIG-1035: support for skewed outer join (sriranjan via pradeepkth)
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov 2 20:20:05 2009
@@ -109,6 +109,8 @@
BUG FIXES
+PIG-1035: support for skewed outer join (sriranjan via pradeepkth)
+
PIG-1030: explain and dump not working with two UDFs inside inner plan of
foreach (rding via pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Nov 2 20:20:05 2009
@@ -91,6 +91,7 @@
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.util.CompilerUtils;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
@@ -1495,7 +1496,7 @@
pkg.setKeyType(type);
pkg.setResultType(DataType.TUPLE);
pkg.setNumInps(2);
- boolean[] inner = {true, true};
+ boolean [] inner = op.getInnerFlags();
pkg.setInner(inner);
pkg.visit(this);
compiledInputs = new MapReduceOper[] {curMROp};
@@ -1504,23 +1505,22 @@
List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
List<Boolean> flat = new ArrayList<Boolean>();
- PhysicalPlan ep = new PhysicalPlan();
- POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setColumn(1);
- prj.setOverloaded(false);
- prj.setResultType(DataType.BAG);
- ep.add(prj);
- eps.add(ep);
- flat.add(true);
-
- ep = new PhysicalPlan();
- prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
- prj.setColumn(2);
- prj.setOverloaded(false);
- prj.setResultType(DataType.BAG);
- ep.add(prj);
- eps.add(ep);
- flat.add(true);
+ PhysicalPlan ep;
+ // Add corresponding POProjects
+ for (int i=0; i < 2; i++ ) {
+ ep = new PhysicalPlan();
+ POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ prj.setColumn(i+1);
+ prj.setOverloaded(false);
+ prj.setResultType(DataType.BAG);
+ ep.add(prj);
+ eps.add(ep);
+ if (!inner[i]) {
+ // Add an empty bag for outer join
+ CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+ }
+ flat.add(true);
+ }
POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
fe.setResultType(DataType.TUPLE);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Nov 2 20:20:05 2009
@@ -62,6 +62,7 @@
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.CompilerUtils;
import org.apache.pig.impl.util.LinkedMultiMap;
import org.apache.pig.impl.util.MultiMap;
@@ -834,7 +835,7 @@
POSkewedJoin skj;
try {
skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
- inp);
+ inp, loj.getInnerFlags());
skj.setJoinPlans(joinPlans);
}
catch (Exception e) {
@@ -843,6 +844,30 @@
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
skj.setResultType(DataType.TUPLE);
+
+ boolean[] innerFlags = loj.getInnerFlags();
+ for (int i=0; i < inputs.size(); i++) {
+ LogicalOperator op = inputs.get(i);
+ if (!innerFlags[i]) {
+ try {
+ Schema s = op.getSchema();
+ // if the schema cannot be determined
+ if (s == null) {
+ throw new FrontendException();
+ }
+ skj.addSchema(s);
+ } catch (FrontendException e) {
+ int errCode = 2015;
+ String msg = "Couldn't set the schema for outer join" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ } else {
+ // This will never be retrieved. It just guarantees that the index will be valid when
+ // MRCompiler is trying to read the schema
+ skj.addSchema(null);
+ }
+ }
+
currentPlan.add(skj);
for (LogicalOperator op : inputs) {
@@ -1045,8 +1070,8 @@
Schema inputSchema = null;
try {
inputSchema = joinInput.getSchema();
-
-
+
+
if(inputSchema == null) {
int errCode = 1105;
String msg = "Input (" + joinInput.getAlias() + ") " +
@@ -1059,71 +1084,7 @@
throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
}
- // we currently have POProject[bag] as the only operator in the plan
- // If the bag is an empty bag, we should replace
- // it with a bag with one tuple with null fields so that when we flatten
- // we do not drop records (flatten will drop records if the bag is left
- // as an empty bag) and actually project nulls for the fields in
- // the empty bag
-
- // So we need to get to the following state:
- // POProject[Bag]
- // \
- // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
- // \ | POProject[Bag]
- // \ | /
- // POBinCond
-
- POProject relationProject = (POProject) fePlan.getRoots().get(0);
- try {
-
- // condition of the bincond
- POProject relationProjectForIsEmpty = relationProject.clone();
- fePlan.add(relationProjectForIsEmpty);
- String scope = relationProject.getOperatorKey().scope;
- FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
- Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
- POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
- getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f);
- isEmpty.setResultType(DataType.BOOLEAN);
- fePlan.add(isEmpty);
- fePlan.connect(relationProjectForIsEmpty, isEmpty);
-
- // lhs of bincond (const bag with null fields)
- ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- // the following should give a tuple with the
- // required number of nulls
- Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
- for(int i = 0; i < inputSchema.size(); i++) {
- t.set(i, null);
- }
- List<Tuple> bagContents = new ArrayList<Tuple>(1);
- bagContents.add(t);
- DataBag bg = new NonSpillableDataBag(bagContents);
- ce.setValue(bg);
- ce.setResultType(DataType.BAG);
- //this operator doesn't have any predecessors
- fePlan.add(ce);
-
- //rhs of bincond is the original project
- // let's set up the bincond now
- POBinCond bincond = new POBinCond(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)));
- bincond.setCond(isEmpty);
- bincond.setLhs(ce);
- bincond.setRhs(relationProject);
- bincond.setResultType(DataType.BAG);
- fePlan.add(bincond);
-
- fePlan.connect(isEmpty, bincond);
- fePlan.connect(ce, bincond);
- fePlan.connect(relationProject, bincond);
-
- } catch (Exception e) {
- throw new PlanException("Error setting up outerjoin", e);
- }
-
+ CompilerUtils.addEmptyBagOuterJoin(fePlan, inputSchema);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Mon Nov 2 20:20:05 2009
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
@@ -42,6 +44,10 @@
private static final long serialVersionUID = 1L;
+ private boolean[] mInnerFlags;
+
+ // The schema is used only by the MRCompiler to support outer join
+ transient private List<Schema> inputSchema = new ArrayList<Schema>();
transient private static Log log = LogFactory.getLog(POSkewedJoin.class);
@@ -51,19 +57,30 @@
private MultiMap<PhysicalOperator, PhysicalPlan> mJoinPlans;
public POSkewedJoin(OperatorKey k) {
- this(k,-1,null);
+ this(k,-1,null, null);
}
public POSkewedJoin(OperatorKey k, int rp) {
- this(k, rp, null);
+ this(k, rp, null, null);
}
- public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp) {
- this(k, -1, inp);
+ public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp, boolean []flags) {
+ this(k, -1, inp, flags);
}
- public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) {
- super(k,rp,inp);
+ public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, boolean []flags) {
+ super(k,rp,inp);
+ if (flags != null) {
+ // copy the inner flags
+ mInnerFlags = new boolean[flags.length];
+ for (int i = 0; i < flags.length; i++) {
+ mInnerFlags[i] = flags[i];
+ }
+ }
+ }
+
+ public boolean[] getInnerFlags() {
+ return mInnerFlags;
}
public MultiMap<PhysicalOperator, PhysicalPlan> getJoinPlans() {
@@ -93,5 +110,13 @@
public boolean supportsMultipleOutputs() {
return false;
}
+
+ public void addSchema(Schema s) {
+ inputSchema.add(s);
+ }
+
+ public Schema getSchema(int i) {
+ return inputSchema.get(i);
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Nov 2 20:20:05 2009
@@ -2019,10 +2019,7 @@
}
frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);
}
- |"\"skewed\"" {
- if(isOuter) {
- throw new ParseException("Skewed join does not support (left|right|full) outer joins");
- }
+ |"\"skewed\"" {
skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED);
}
|"\"merge\"" {
Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java?rev=832086&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java Mon Nov 2 20:20:05 2009
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.IsEmpty;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+/*
+ * A class to add util functions that gets used by LogToPhyTranslator and MRCompiler
+ *
+ */
+public class CompilerUtils {
+
+ public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException {
+ // we currently have POProject[bag] as the only operator in the plan
+ // If the bag is an empty bag, we should replace
+ // it with a bag with one tuple with null fields so that when we flatten
+ // we do not drop records (flatten will drop records if the bag is left
+ // as an empty bag) and actually project nulls for the fields in
+ // the empty bag
+
+ // So we need to get to the following state:
+ // POProject[Bag]
+ // \
+ // POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)
+ // \ | POProject[Bag]
+ // \ | /
+ // POBinCond
+ POProject relationProject = (POProject) fePlan.getRoots().get(0);
+ try {
+
+ // condition of the bincond
+ POProject relationProjectForIsEmpty = relationProject.clone();
+ fePlan.add(relationProjectForIsEmpty);
+ String scope = relationProject.getOperatorKey().scope;
+ FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
+ Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
+ POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+ getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f);
+ isEmpty.setResultType(DataType.BOOLEAN);
+ fePlan.add(isEmpty);
+ fePlan.connect(relationProjectForIsEmpty, isEmpty);
+
+ // lhs of bincond (const bag with null fields)
+ ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ // the following should give a tuple with the
+ // required number of nulls
+ Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
+ for(int i = 0; i < inputSchema.size(); i++) {
+ t.set(i, null);
+ }
+ List<Tuple> bagContents = new ArrayList<Tuple>(1);
+ bagContents.add(t);
+ DataBag bg = new NonSpillableDataBag(bagContents);
+ ce.setValue(bg);
+ ce.setResultType(DataType.BAG);
+ //this operator doesn't have any predecessors
+ fePlan.add(ce);
+
+ //rhs of bincond is the original project
+ // let's set up the bincond now
+ POBinCond bincond = new POBinCond(new OperatorKey(scope,
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ bincond.setCond(isEmpty);
+ bincond.setLhs(ce);
+ bincond.setRhs(relationProject);
+ bincond.setResultType(DataType.BAG);
+ fePlan.add(bincond);
+
+ fePlan.connect(isEmpty, bincond);
+ fePlan.connect(ce, bincond);
+ fePlan.connect(relationProject, bincond);
+
+ } catch (Exception e) {
+ throw new PlanException("Error setting up outerjoin", e);
+ }
+
+ }
+
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Mon Nov 2 20:20:05 2009
@@ -456,7 +456,7 @@
lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
String[] types = new String[] { "left", "right", "full" };
- String[] joinTypes = new String[] { "replicated", "repl", "skewed", "merge" };
+ String[] joinTypes = new String[] { "replicated", "repl", "merge" };
for (int i = 0; i < types.length; i++) {
for(int j = 0; j < joinTypes.length; j++) {
boolean errCaught = false;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Mon Nov 2 20:20:05 2009
@@ -324,6 +324,43 @@
return;
}
+ public void testSkewedJoinOuter() throws IOException {
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by id left, B by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by id right, B by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ {
+ pigServer.registerQuery("C = join A by id full, B by id using \"skewed\";");
+ Iterator<Tuple> iter = pigServer.openIterator("C");
+
+ while(iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
+ }
+ } catch(Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ fail("Should support outer join in skewed join");
+ }
+ return;
+ }
+
// pig 1048
public void testSkewedJoinOneValue() throws IOException {
pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");