You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/02 21:20:06 UTC

svn commit: r832086 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...

Author: pradeepkth
Date: Mon Nov  2 20:20:05 2009
New Revision: 832086

URL: http://svn.apache.org/viewvc?rev=832086&view=rev
Log:
PIG-1035: support for skewed outer join (sriranjan via pradeepkth)

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Mon Nov  2 20:20:05 2009
@@ -109,6 +109,8 @@
 
 BUG FIXES
 
+PIG-1035: support for skewed outer join (sriranjan via pradeepkth)
+
 PIG-1030: explain and dump not working with two UDFs inside inner plan of
 foreach (rding via pradeepkth)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Nov  2 20:20:05 2009
@@ -91,6 +91,7 @@
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
+import org.apache.pig.impl.util.CompilerUtils;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
@@ -1495,7 +1496,7 @@
 			pkg.setKeyType(type);
 			pkg.setResultType(DataType.TUPLE);
 			pkg.setNumInps(2);
-			boolean[] inner = {true, true}; 
+			boolean [] inner = op.getInnerFlags();
 			pkg.setInner(inner);            
 			pkg.visit(this);       
 			compiledInputs = new MapReduceOper[] {curMROp};
@@ -1504,23 +1505,22 @@
 			List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
 			List<Boolean> flat = new ArrayList<Boolean>();
 			
-			PhysicalPlan ep = new PhysicalPlan();
-			POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-			prj.setColumn(1);
-			prj.setOverloaded(false);
-			prj.setResultType(DataType.BAG);
-			ep.add(prj);
-			eps.add(ep);
-			flat.add(true);
-
-			ep = new PhysicalPlan();
-			prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-			prj.setColumn(2);
-			prj.setOverloaded(false);
-			prj.setResultType(DataType.BAG);
-			ep.add(prj);
-			eps.add(ep);
-			flat.add(true);
+			PhysicalPlan ep;
+			// Add corresponding POProjects
+			for (int i=0; i < 2; i++ ) {
+			    ep = new PhysicalPlan();
+			    POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+			    prj.setColumn(i+1);
+			    prj.setOverloaded(false);
+			    prj.setResultType(DataType.BAG);
+			    ep.add(prj);
+			    eps.add(ep);
+			    if (!inner[i]) {
+			        // Add an empty bag for outer join
+			        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+			    }
+			    flat.add(true);
+			}
 
 			POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
 			fe.setResultType(DataType.TUPLE);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Mon Nov  2 20:20:05 2009
@@ -62,6 +62,7 @@
 import org.apache.pig.impl.plan.VisitorException;
 
 
+import org.apache.pig.impl.util.CompilerUtils;
 import org.apache.pig.impl.util.LinkedMultiMap;
 import org.apache.pig.impl.util.MultiMap;
 
@@ -834,7 +835,7 @@
 			POSkewedJoin skj;
 			try {
 				skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelism(),
-											inp);
+											inp, loj.getInnerFlags());
 				skj.setJoinPlans(joinPlans);
 			}
 			catch (Exception e) {
@@ -843,6 +844,30 @@
 				throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
 			}
 			skj.setResultType(DataType.TUPLE);
+			
+            boolean[] innerFlags = loj.getInnerFlags();
+			for (int i=0; i < inputs.size(); i++) {
+				LogicalOperator op = inputs.get(i);
+				if (!innerFlags[i]) {
+					try {
+						Schema s = op.getSchema();
+						// if the schema cannot be determined
+						if (s == null) {
+							throw new FrontendException();
+						}
+						skj.addSchema(s);
+					} catch (FrontendException e) {
+						int errCode = 2015;
+						String msg = "Couldn't set the schema for outer join" ;
+						throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+					}
+				} else {
+				    // This will never be retrieved. It just guarantees that the index will be valid when
+				    // MRCompiler is trying to read the schema
+				    skj.addSchema(null);
+				}
+			}
+			
 			currentPlan.add(skj);
 
 			for (LogicalOperator op : inputs) {
@@ -1045,8 +1070,8 @@
         Schema inputSchema = null;
         try {
             inputSchema = joinInput.getSchema();
-            
-            
+         
+          
             if(inputSchema == null) {
                 int errCode = 1105;
                 String msg = "Input (" + joinInput.getAlias() + ") " +
@@ -1059,71 +1084,7 @@
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
         
-        // we currently have POProject[bag] as the only operator in the plan
-        // If the bag is an empty bag, we should replace
-        // it with a bag with one tuple with null fields so that when we flatten
-        // we do not drop records (flatten will drop records if the bag is left
-        // as an empty bag) and actually project nulls for the fields in 
-        // the empty bag
-        
-        // So we need to get to the following state:
-        // POProject[Bag]
-        //         \     
-        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
-        //                        \      |    POProject[Bag]             
-        //                         \     |    /
-        //                          POBinCond
-        
-        POProject relationProject = (POProject) fePlan.getRoots().get(0);
-        try {
-            
-            // condition of the bincond
-            POProject relationProjectForIsEmpty = relationProject.clone();
-            fePlan.add(relationProjectForIsEmpty);
-            String scope = relationProject.getOperatorKey().scope;
-            FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
-            Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
-            POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
-                        getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f);
-            isEmpty.setResultType(DataType.BOOLEAN);
-            fePlan.add(isEmpty);
-            fePlan.connect(relationProjectForIsEmpty, isEmpty);
-            
-            // lhs of bincond (const bag with null fields)
-            ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
-                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-            // the following should give a tuple with the
-            // required number of nulls
-            Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
-            for(int i = 0; i < inputSchema.size(); i++) {
-                t.set(i, null);
-            }
-            List<Tuple> bagContents = new ArrayList<Tuple>(1);
-            bagContents.add(t);
-            DataBag bg = new NonSpillableDataBag(bagContents);
-            ce.setValue(bg);
-            ce.setResultType(DataType.BAG);
-            //this operator doesn't have any predecessors
-            fePlan.add(ce);
-            
-            //rhs of bincond is the original project
-            // let's set up the bincond now
-            POBinCond bincond = new POBinCond(new OperatorKey(scope,
-                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-            bincond.setCond(isEmpty);
-            bincond.setLhs(ce);
-            bincond.setRhs(relationProject);
-            bincond.setResultType(DataType.BAG);
-            fePlan.add(bincond);
-
-            fePlan.connect(isEmpty, bincond);
-            fePlan.connect(ce, bincond);
-            fePlan.connect(relationProject, bincond);
-
-        } catch (Exception e) {
-            throw new PlanException("Error setting up outerjoin", e);
-        }
-        
+        CompilerUtils.addEmptyBagOuterJoin(fePlan, inputSchema);
         
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Mon Nov  2 20:20:05 2009
@@ -18,6 +18,7 @@
 
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
@@ -42,6 +44,10 @@
 
 	
 	private static final long serialVersionUID = 1L;
+	private boolean[] mInnerFlags;
+	
+	// The schema is used only by the MRCompiler to support outer join
+	transient private List<Schema> inputSchema = new ArrayList<Schema>();
 	
 	transient private static Log log = LogFactory.getLog(POSkewedJoin.class);
 	
@@ -51,19 +57,30 @@
     private MultiMap<PhysicalOperator, PhysicalPlan> mJoinPlans;
 
     public POSkewedJoin(OperatorKey k)  {
-        this(k,-1,null);
+        this(k,-1,null, null);
     }
 
     public POSkewedJoin(OperatorKey k, int rp) {
-        this(k, rp, null);
+        this(k, rp, null, null);
     }
 
-    public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp) {
-        this(k, -1, inp);
+    public POSkewedJoin(OperatorKey k, List<PhysicalOperator> inp, boolean []flags) {
+        this(k, -1, inp, flags);
     }
 
-    public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp) {
-        super(k,rp,inp);      
+    public POSkewedJoin(OperatorKey k, int rp, List<PhysicalOperator> inp, boolean []flags) {
+        super(k,rp,inp);
+        if (flags != null) {
+        	// copy the inner flags
+        	mInnerFlags = new boolean[flags.length];
+        	for (int i = 0; i < flags.length; i++) {
+        		mInnerFlags[i] = flags[i];
+        	}
+        }
+    }
+    
+    public boolean[] getInnerFlags() {
+    	return mInnerFlags;
     }
     
     public MultiMap<PhysicalOperator, PhysicalPlan> getJoinPlans() {
@@ -93,5 +110,13 @@
 	public boolean supportsMultipleOutputs() {	
 		return false;
 	}
+	
+	public void addSchema(Schema s) {
+		inputSchema.add(s);
+	}
+	
+	public Schema getSchema(int i) {
+		return inputSchema.get(i);
+	}
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Nov  2 20:20:05 2009
@@ -2019,10 +2019,7 @@
                     }
 				    frj=parseJoin(gis, lp, LOJoin.JOINTYPE.REPLICATED);
 				}
-		    |"\"skewed\"" { 
-		    	    if(isOuter) {
-                        throw new ParseException("Skewed join does not support (left|right|full) outer joins");
-                    }
+		    |"\"skewed\"" {
 		    	    skj = parseJoin(gis, lp, LOJoin.JOINTYPE.SKEWED); 
 		    	}
 		    |"\"merge\"" { 

Added: hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java?rev=832086&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/CompilerUtils.java Mon Nov  2 20:20:05 2009
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.builtin.IsEmpty;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+
+/* 
+ * A class to add util functions that gets used by LogToPhyTranslator and MRCompiler
+ * 
+ */
+public class CompilerUtils {
+
+    public static void addEmptyBagOuterJoin(PhysicalPlan fePlan, Schema inputSchema) throws PlanException {
+        // we currently have POProject[bag] as the only operator in the plan
+        // If the bag is an empty bag, we should replace
+        // it with a bag with one tuple with null fields so that when we flatten
+        // we do not drop records (flatten will drop records if the bag is left
+        // as an empty bag) and actually project nulls for the fields in 
+        // the empty bag
+        
+        // So we need to get to the following state:
+        // POProject[Bag]
+        //         \     
+        //    POUserFunc["IsEmpty()"] Const[Bag](bag with null fields)   
+        //                        \      |    POProject[Bag]             
+        //                         \     |    /
+        //                          POBinCond
+        POProject relationProject = (POProject) fePlan.getRoots().get(0);
+        try {
+            
+            // condition of the bincond
+            POProject relationProjectForIsEmpty = relationProject.clone();
+            fePlan.add(relationProjectForIsEmpty);
+            String scope = relationProject.getOperatorKey().scope;
+            FuncSpec isEmptySpec = new FuncSpec(IsEmpty.class.getName());
+            Object f = PigContext.instantiateFuncFromSpec(isEmptySpec);
+            POUserFunc isEmpty = new POUserFunc(new OperatorKey(scope, NodeIdGenerator.getGenerator().
+                        getNextNodeId(scope)), -1, null, isEmptySpec, (EvalFunc) f);
+            isEmpty.setResultType(DataType.BOOLEAN);
+            fePlan.add(isEmpty);
+            fePlan.connect(relationProjectForIsEmpty, isEmpty);
+            
+            // lhs of bincond (const bag with null fields)
+            ConstantExpression ce = new ConstantExpression(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            // the following should give a tuple with the
+            // required number of nulls
+            Tuple t = TupleFactory.getInstance().newTuple(inputSchema.size());
+            for(int i = 0; i < inputSchema.size(); i++) {
+                t.set(i, null);
+            }
+            List<Tuple> bagContents = new ArrayList<Tuple>(1);
+            bagContents.add(t);
+            DataBag bg = new NonSpillableDataBag(bagContents);
+            ce.setValue(bg);
+            ce.setResultType(DataType.BAG);
+            //this operator doesn't have any predecessors
+            fePlan.add(ce);
+            
+            //rhs of bincond is the original project
+            // let's set up the bincond now
+            POBinCond bincond = new POBinCond(new OperatorKey(scope,
+                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+            bincond.setCond(isEmpty);
+            bincond.setLhs(ce);
+            bincond.setRhs(relationProject);
+            bincond.setResultType(DataType.BAG);
+            fePlan.add(bincond);
+
+            fePlan.connect(isEmpty, bincond);
+            fePlan.connect(ce, bincond);
+            fePlan.connect(relationProject, bincond);
+
+        } catch (Exception e) {
+            throw new PlanException("Error setting up outerjoin", e);
+        }
+    	
+    }
+
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestJoin.java Mon Nov  2 20:20:05 2009
@@ -456,7 +456,7 @@
         lpt.buildPlan("a = load 'a.txt' as (n:chararray, a:int); ");
         lpt.buildPlan("b = load 'b.txt' as (n:chararray, m:chararray); ");
         String[] types = new String[] { "left", "right", "full" };
-        String[] joinTypes = new String[] { "replicated", "repl", "skewed", "merge" };
+        String[] joinTypes = new String[] { "replicated", "repl", "merge" };
         for (int i = 0; i < types.length; i++) {
             for(int j = 0; j < joinTypes.length; j++) {
                 boolean errCaught = false;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java?rev=832086&r1=832085&r2=832086&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSkewedJoin.java Mon Nov  2 20:20:05 2009
@@ -324,6 +324,43 @@
         return;
     }
     
+    public void testSkewedJoinOuter() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by id left, B by id using \"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("C = join A by id right, B by id using \"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("C = join A by id full, B by id using \"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        } catch(Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            fail("Should support outer join in skewed join");
+        }
+        return;
+    }
+    
     // pig 1048
     public void testSkewedJoinOneValue() throws IOException {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");