You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/05/11 00:44:37 UTC
svn commit: r1593741 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/newplan/logical/ src/org/apache/pig/parser/
test/org/apache/pig/test/
Author: cheolsoo
Date: Sat May 10 22:44:36 2014
New Revision: 1593741
URL: http://svn.apache.org/r1593741
Log:
PIG-3902: PigServer creates cycle (thedatachef via cheolsoo)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/PigServer.java
pig/trunk/src/org/apache/pig/newplan/logical/Util.java
pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat May 10 22:44:36 2014
@@ -127,6 +127,8 @@ PIG-3882: Multiquery off mode execution
BUG FIXES
+PIG-3902: PigServer creates cycle (thedatachef via cheolsoo)
+
PIG-3930: "java.io.IOException: Cannot initialize Cluster" in local mode with hadoopversion=23 dependencies (jira.shegalov via cheolsoo)
PIG-3921: Obsolete entries in piggybank javadoc build script (mrflip via cheolsoo)
Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Sat May 10 22:44:36 2014
@@ -73,6 +73,7 @@ import org.apache.pig.impl.util.UriUtil;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.newplan.logical.expression.ScalarExpression;
@@ -1010,6 +1011,7 @@ public class PigServer {
alias = getLastRel();
}
currDAG.parseQuery();
+ currDAG.skipStores(); // skip the stores that have already been processed
currDAG.buildPlan( alias );
try {
@@ -1273,6 +1275,7 @@ public class PigServer {
execute();
}
currDAG.parseQuery();
+ currDAG.skipStores();
currDAG.buildPlan( alias );
currDAG.compile();
} catch (IOException e) {
@@ -1455,12 +1458,9 @@ public class PigServer {
/**
* Call back method for counting executed stores.
*/
- private void countExecutedStores() {
- for( Operator sink : lp.getSinks() ) {
- if( sink instanceof LOStore ) {
- processedStores++;
- }
- }
+ private void countExecutedStores() throws FrontendException {
+ List<LOStore> sinks = Util.getLogicalRelationalOperators(lp, LOStore.class);
+ processedStores += sinks.size();
}
Map<LogicalRelationalOperator, LogicalPlan> getAliases() {
@@ -1532,11 +1532,21 @@ public class PigServer {
}
queue.add( op );
} else {
- List<Operator> sinks = lp.getSinks();
- if( sinks != null ) {
- for( Operator sink : sinks ) {
- if( sink instanceof LOStore )
- queue.add( sink );
+ List<LOStore> stores = Util.getLogicalRelationalOperators(lp, LOStore.class);
+ for (LOStore op : stores) {
+ boolean addSink = true;
+ // Only add if all the successors are loads
+ List<Operator> succs = lp.getSuccessors(op);
+ if (succs != null && succs.size() > 0) {
+ for (Operator succ : succs) {
+ if (!(succ instanceof LOLoad)) {
+ addSink = false;
+ break;
+ }
+ }
+ }
+ if (addSink) {
+ queue.add(op);
}
}
}
@@ -1584,27 +1594,45 @@ public class PigServer {
* Remove stores that have been executed previously from the overall plan.
*/
private void skipStores() throws IOException {
- List<Operator> sinks = lp.getSinks();
+ // Get stores specifically
+ List<LOStore> sinks = Util.getLogicalRelationalOperators(lp, LOStore.class);
List<Operator> sinksToRemove = new ArrayList<Operator>();
int skipCount = processedStores;
if( skipCount > 0 ) {
- for( Operator sink : sinks ) {
- if( sink instanceof LOStore ) {
- sinksToRemove.add( sink );
- skipCount--;
- if( skipCount == 0 )
- break;
- }
+ for( LOStore sink : sinks ) {
+ sinksToRemove.add( sink );
+ skipCount--;
+ if( skipCount == 0 )
+ break;
}
}
for( Operator op : sinksToRemove ) {
+ // It's fully possible in the multiquery case that
+ // a store that is not a leaf (sink) and therefor has
+ // successors that need to be removed.
+ removeToLoad(op);
Operator pred = lp.getPredecessors( op ).get(0);
lp.disconnect( pred, op );
lp.remove( op );
}
}
+ private void removeToLoad(Operator toRemove) throws IOException {
+ List<Operator> successors = lp.getSuccessors(toRemove);
+ List<Operator> succToRemove = new ArrayList<Operator>();
+ if (successors != null && successors.size() > 0) {
+ succToRemove.addAll(successors);
+ for (Operator succ : succToRemove) {
+ lp.disconnect( toRemove, succ );
+ if (!(succ instanceof LOLoad)) {
+ removeToLoad(succ);
+ lp.remove(succ);
+ }
+ }
+ }
+ }
+
/**
* Accumulate the given statement to previous query statements and generate
* an overall (raw) plan.
@@ -1796,7 +1824,7 @@ public class PigServer {
for (LOStore store : storeOps) {
String ifile = load.getFileSpec().getFileName();
String ofile = store.getFileSpec().getFileName();
- if (ofile.compareTo(ifile) == 0) {
+ if (ofile.equals(ifile)) {
// if there is no path from the load to the store,
// then connect the store to the load to create the
// dependency of the store on the load. If there is
Modified: pig/trunk/src/org/apache/pig/newplan/logical/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/Util.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/Util.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/Util.java Sat May 10 22:44:36 2014
@@ -18,61 +18,82 @@
package org.apache.pig.newplan.logical;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LOCube;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOGenerate;
import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LORank;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import com.google.common.collect.Lists;
+
public class Util {
- public static LogicalSchema translateSchema(Schema schema) {
+ public static LogicalSchema translateSchema(Schema schema) {
if (schema == null) {
return null;
}
-
+
LogicalSchema s2 = new LogicalSchema();
List<Schema.FieldSchema> ll = schema.getFields();
for (Schema.FieldSchema f: ll) {
- LogicalSchema.LogicalFieldSchema f2 =
+ LogicalSchema.LogicalFieldSchema f2 =
new LogicalSchema.LogicalFieldSchema(f.alias, translateSchema(f.schema), f.type);
-
+
s2.addField(f2);
}
-
+
return s2;
}
-
- public static LogicalSchema.LogicalFieldSchema translateFieldSchema(Schema.FieldSchema fs) {
+
+ public static LogicalSchema.LogicalFieldSchema translateFieldSchema(Schema.FieldSchema fs) {
LogicalSchema newSchema = null;
if (fs.schema!=null) {
newSchema = translateSchema(fs.schema);
}
-
+
LogicalSchema.LogicalFieldSchema newFs = new LogicalSchema.LogicalFieldSchema(fs.alias, newSchema, fs.type);
return newFs;
}
-
+
/**
* This function translates the new LogicalSchema into old Schema format required
* by PhysicalOperators
* @param schema LogicalSchema to be converted to Schema
* @return Schema that is converted from LogicalSchema
- * @throws FrontendException
+ * @throws FrontendException
*/
- public static Schema translateSchema(LogicalSchema schema) {
+ public static Schema translateSchema(LogicalSchema schema) {
if (schema == null) {
return null;
}
-
+
Schema s2 = new Schema();
List<LogicalSchema.LogicalFieldSchema> ll = schema.getFields();
for (LogicalSchema.LogicalFieldSchema f: ll) {
@@ -84,17 +105,17 @@ public class Util {
} catch (FrontendException e) {
}
}
-
+
return s2;
}
-
+
/**
* If schema argument has fields where a bag does not contain a tuple schema,
* it inserts a tuple schema. It does so for all inner levels.
- * eg bag({int}) => bag({(int)})
+ * eg bag({int}) => bag({(int)})
* @param sch
* @return modified schema
- * @throws FrontendException
+ * @throws FrontendException
*/
public static Schema fixSchemaAddTupleInBag(Schema sch) throws FrontendException{
LogicalSchema logSch = translateSchema(sch);
@@ -110,7 +131,7 @@ public class Util {
if (fs.schema!=null) {
newSchema = translateSchema(fs.schema);
}
-
+
Schema.FieldSchema newFs = null;
try {
newFs = new Schema.FieldSchema(null, newSchema, fs.type);
@@ -118,11 +139,11 @@ public class Util {
}
return newFs;
}
-
+
public static LOForEach addForEachAfter(LogicalPlan plan, LogicalRelationalOperator op, int branch,
Set<Integer> columnsToDrop) throws FrontendException {
LOForEach foreach = new LOForEach(plan);
-
+
plan.add(foreach);
List<Operator> next = plan.getSuccessors(op);
if (next != null) {
@@ -133,26 +154,26 @@ public class Util {
else {
plan.connect(op, foreach);
}
-
+
LogicalPlan innerPlan = new LogicalPlan();
foreach.setInnerPlan(innerPlan);
-
+
LogicalSchema schema = op.getSchema();
-
+
// build foreach inner plan
List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[schema.size()-columnsToDrop.size()]);
innerPlan.add(gen);
-
+
for (int i=0, j=0; i<schema.size(); i++) {
if (columnsToDrop.contains(i)) {
continue;
}
-
+
LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
innerPlan.add(innerLoad);
innerPlan.connect(innerLoad, gen);
-
+
LogicalExpressionPlan exp = new LogicalExpressionPlan();
ProjectExpression prj = new ProjectExpression(exp, j++, -1, gen);
exp.add(prj);
@@ -160,4 +181,136 @@ public class Util {
}
return foreach;
}
+
+ /**
+ * Returns a LinkedList of operators contained within the physical plan
+ * which implement the supplied class, in dependency order. Returns an empty
+ * LinkedList of no such operators exist.
+ * @param plan
+ * @param opClass
+ * @return a LinkedList of operators contained within the plan which
+ * implement the supplied class; empty if no such ops exist.
+ * @throws FrontendException
+ */
+ public static <C extends LogicalRelationalOperator> LinkedList<C> getLogicalRelationalOperators(LogicalPlan plan,
+ Class<C> opClass) throws FrontendException {
+ OpFinder<C> finder = new OpFinder<C>(plan, opClass);
+ finder.visit();
+ return finder.getFoundOps();
+ }
+
+ private static class OpFinder<C extends LogicalRelationalOperator> extends LogicalRelationalNodesVisitor {
+ final Class<C> opClass;
+ private LinkedList<C> foundOps = Lists.newLinkedList();
+
+ public OpFinder(LogicalPlan plan, Class<C> opClass) throws FrontendException {
+ super(plan, new DependencyOrderWalker(plan));
+ this.opClass = opClass;
+ }
+
+ public LinkedList<C> getFoundOps() {
+ return foundOps;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void visitOp(LogicalRelationalOperator op) {
+ if (opClass.isAssignableFrom(op.getClass())) {
+ foundOps.add((C) op);
+ }
+ }
+
+ public void visit(LOLoad load) throws FrontendException {
+ visitOp(load);
+ }
+
+ @Override
+ public void visit(LOFilter filter) throws FrontendException {
+ visitOp(filter);
+ }
+
+ @Override
+ public void visit(LOStore store) throws FrontendException {
+ visitOp(store);
+ }
+
+ @Override
+ public void visit(LOJoin join) throws FrontendException {
+ visitOp(join);
+ }
+
+ @Override
+ public void visit(LOForEach foreach) throws FrontendException {
+ visitOp(foreach);
+ }
+
+ @Override
+ public void visit(LOGenerate gen) throws FrontendException {
+ visitOp(gen);
+ }
+
+ @Override
+ public void visit(LOInnerLoad load) throws FrontendException {
+ visitOp(load);
+ }
+
+ @Override
+ public void visit(LOCube cube) throws FrontendException {
+ visitOp(cube);
+ }
+
+ @Override
+ public void visit(LOCogroup loCogroup) throws FrontendException {
+ visitOp(loCogroup);
+ }
+
+ @Override
+ public void visit(LOSplit loSplit) throws FrontendException {
+ visitOp(loSplit);
+ }
+
+ @Override
+ public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
+ visitOp(loSplitOutput);
+ }
+
+ @Override
+ public void visit(LOUnion loUnion) throws FrontendException {
+ visitOp(loUnion);
+ }
+
+ @Override
+ public void visit(LOSort loSort) throws FrontendException {
+ visitOp(loSort);
+ }
+
+ @Override
+ public void visit(LORank loRank) throws FrontendException{
+ visitOp(loRank);
+ }
+
+ @Override
+ public void visit(LODistinct loDistinct) throws FrontendException {
+ visitOp(loDistinct);
+ }
+
+ @Override
+ public void visit(LOLimit loLimit) throws FrontendException {
+ visitOp(loLimit);
+ }
+
+ @Override
+ public void visit(LOCross loCross) throws FrontendException {
+ visitOp(loCross);
+ }
+
+ @Override
+ public void visit(LOStream loStream) throws FrontendException {
+ visitOp(loStream);
+ }
+
+ @Override
+ public void visit(LONative nativeMR) throws FrontendException{
+ visitOp(nativeMR);
+ }
+ }
}
Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Sat May 10 22:44:36 2014
@@ -431,22 +431,21 @@ public class LogicalPlanBuilder {
return alias;
}
-
LOCube createCubeOp() {
- return new LOCube(plan);
+ return new LOCube(plan);
}
String buildCubeOp(SourceLocation loc, LOCube op, String alias, String inputAlias,
- List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
- throws ParserValidationException {
+ List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
+ throws ParserValidationException {
- // check if continuously occurring cube operations be combined
- combineCubeOperations((ArrayList<String>) operations, expressionPlans);
+ // check if continuously occurring cube operations be combined
+ combineCubeOperations((ArrayList<String>) operations, expressionPlans);
- // set the expression plans for cube operator and build cube operator
- op.setExpressionPlans(expressionPlans);
- op.setOperations(operations);
- buildOp(loc, op, alias, inputAlias, null);
+ // set the expression plans for cube operator and build cube operator
+ op.setExpressionPlans(expressionPlans);
+ op.setOperations(operations);
+ buildOp(loc, op, alias, inputAlias, null);
try {
(new ProjectStarExpander(op.getPlan())).visit(op);
(new ProjStarInUdfExpander(op.getPlan())).visit(op);
@@ -454,100 +453,100 @@ public class LogicalPlanBuilder {
} catch (FrontendException e ) {
throw new ParserValidationException( intStream, loc, e );
}
- try {
- alias = convertCubeToFGPlan(loc, op, inputAlias, operations, expressionPlans);
- } catch (FrontendException e) {
- throw new ParserValidationException(intStream, loc, e);
- }
- return alias;
+ try {
+ alias = convertCubeToFGPlan(loc, op, inputAlias, operations, expressionPlans);
+ } catch (FrontendException e) {
+ throw new ParserValidationException(intStream, loc, e);
+ }
+ return alias;
}
// if multiple CUBE operations occur continuously then it can be combined
// together CUBE rel BY CUBE(a,b), CUBE(c,d); => CUBE rel BY CUBE(a,b,c,d)
private void combineCubeOperations(ArrayList<String> operations,
- MultiMap<Integer, LogicalExpressionPlan> expressionPlans) {
+ MultiMap<Integer, LogicalExpressionPlan> expressionPlans) {
+
+ int startIdx = -1;
+ int endIdx = -1;
+ int i = 0;
+ boolean isMerged = false;
+
+ // scan and perform merge of column projections
+ for (i = 0; i < operations.size(); i++) {
+ if ((startIdx == -1) && (operations.get(i).equals("CUBE") == true)) {
+ startIdx = i;
+ } else {
+ if (operations.get(i).equals("CUBE") == true) {
+ endIdx = i;
+ } else {
+ if (endIdx > startIdx) {
+ mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
+ isMerged = true;
+ startIdx = -1;
+ endIdx = -1;
+ } else {
+ startIdx = -1;
+ endIdx = -1;
+ }
+ }
+ }
+ }
+
+ // this check is required for the case when the sequence of CUBE
+ // operations occurs at the end, like (CUBE, ROLLUP, CUBE, CUBE)
+ // in which case endIdx will be greater than startIdx
+ if (endIdx > startIdx) {
+ isMerged = true;
+ mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
+ }
- int startIdx = -1;
- int endIdx = -1;
- int i = 0;
- boolean isMerged = false;
-
- // scan and perform merge of column projections
- for (i = 0; i < operations.size(); i++) {
- if ((startIdx == -1) && (operations.get(i).equals("CUBE") == true)) {
- startIdx = i;
- } else {
- if (operations.get(i).equals("CUBE") == true) {
- endIdx = i;
- } else {
- if (endIdx > startIdx) {
- mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
- isMerged = true;
- startIdx = -1;
- endIdx = -1;
- } else {
- startIdx = -1;
- endIdx = -1;
- }
- }
- }
- }
-
- // this check is required for the case when the sequence of CUBE
- // operations occurs at the end, like (CUBE, ROLLUP, CUBE, CUBE)
- // in which case endIdx will be greater than startIdx
- if (endIdx > startIdx) {
- isMerged = true;
- mergeAndMarkForDelete(operations, expressionPlans, startIdx, endIdx);
- }
-
- // if merged then remove the column projections that were marked for
- // deletion
- if (isMerged) {
- performDeletion(expressionPlans, operations);
- }
+ // if merged then remove the column projections that were marked for
+ // deletion
+ if (isMerged) {
+ performDeletion(expressionPlans, operations);
+ }
}
private void performDeletion(MultiMap<Integer, LogicalExpressionPlan> expressionPlans,
- ArrayList<String> operations) {
+ ArrayList<String> operations) {
+
+ MultiMap<Integer, LogicalExpressionPlan> ep = new MultiMap<Integer, LogicalExpressionPlan>();
+ List<String> op = new ArrayList<String>();
+ int idx = 0;
+ // rearranging indices
+ for (int i = 0; i < operations.size(); i++) {
+ if (operations.get(i) != null) {
+ op.add(idx, operations.get(i));
+ }
+
+ if (expressionPlans.get(i) != null) {
+ ep.put(idx, expressionPlans.get(i));
+ idx++;
+ }
+ }
+
+ // performing deletions
+ operations.clear();
+ operations.addAll(op);
- MultiMap<Integer, LogicalExpressionPlan> ep = new MultiMap<Integer, LogicalExpressionPlan>();
- List<String> op = new ArrayList<String>();
- int idx = 0;
- // rearranging indices
- for (int i = 0; i < operations.size(); i++) {
- if (operations.get(i) != null) {
- op.add(idx, operations.get(i));
- }
-
- if (expressionPlans.get(i) != null) {
- ep.put(idx, expressionPlans.get(i));
- idx++;
- }
- }
-
- // performing deletions
- operations.clear();
- operations.addAll(op);
-
- expressionPlans.clear();
- for (Integer i : ep.keySet()) {
- expressionPlans.put(i, ep.get(i));
- }
+ expressionPlans.clear();
+ for (Integer i : ep.keySet()) {
+ expressionPlans.put(i, ep.get(i));
+ }
}
// performs merging of dimensions of merged cube operation
// Ex: CUBE(a,b), CUBE(c,d) ==> CUBE(a,b,c,d)
// in the above example CUBE operator and dimensions are merged
private void mergeAndMarkForDelete(ArrayList<String> operations,
- MultiMap<Integer, LogicalExpressionPlan> expressionPlans, int startIdx, int endIdx) {
- // mark for delete
- for (int i = startIdx + 1; i <= endIdx; i++) {
- expressionPlans.put(startIdx, expressionPlans.get(i));
- expressionPlans.removeKey(i);
- operations.remove(i);
- operations.add(i, null);
- }
+ MultiMap<Integer, LogicalExpressionPlan> expressionPlans, int startIdx, int endIdx) {
+ // mark for delete
+ for (int i = startIdx + 1; i <= endIdx; i++) {
+ expressionPlans.put(startIdx, expressionPlans.get(i));
+ expressionPlans.removeKey(i);
+ operations.remove(i);
+ operations.add(i, null);
+ }
}
// This function creates logical plan for foreach and groupby operators.
@@ -555,265 +554,265 @@ public class LogicalPlanBuilder {
// disconnects cube operator from the logical plan. It also connects foreach
// plan with groupby plan.
private String convertCubeToFGPlan(SourceLocation loc, LOCube op, String inputAlias,
- List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
- throws FrontendException {
+ List<String> operations, MultiMap<Integer, LogicalExpressionPlan> expressionPlans)
+ throws FrontendException {
+
+ LOForEach foreach = new LOForEach(plan);
+ LOCogroup groupby = new LOCogroup(plan);
+ LogicalPlan innerPlan = new LogicalPlan();
+ LogicalRelationalOperator gen = new LOGenerate(innerPlan);
+
+ injectForeachOperator(loc, op, foreach);
+
+ // Get all column attributes from the input relation.
+ // Create ProjectExpression for all columns. Based on the
+ // dimensions specified by the user, specified columns will be attached
+ // to CubeDimension/RollupDimension UDF and rest will be pushed down
+ List<Operator> inpOpers = foreach.getPlan().getPredecessors(foreach);
+ List<LogicalExpressionPlan> allExprPlan = new ArrayList<LogicalExpressionPlan>();
+ for (Operator oper : inpOpers) {
+ LogicalSchema schema = new LogicalSchema();
+ schema = ((LogicalRelationalOperator) oper).getSchema();
+
+ if (schema != null) {
+ ArrayList<LogicalFieldSchema> fields = (ArrayList<LogicalFieldSchema>) schema
+ .getFields();
+ for (int i = 0; i < fields.size(); i++) {
+ LogicalExpressionPlan lEplan = new LogicalExpressionPlan();
+ new ProjectExpression(lEplan, i, fields.get(i).alias, null, gen);
+ allExprPlan.add(lEplan);
+ }
+ }
+ }
+
+ // iterate over all operations and generate corresponding UDFs
+ for (int operIdx = 0; operIdx < operations.size(); operIdx++) {
+ List<LogicalExpressionPlan> lexpPlanList = new ArrayList<LogicalExpressionPlan>();
+ List<LogicalExpression> lexpList = new ArrayList<LogicalExpression>();
+
+ lexpPlanList.addAll(expressionPlans.get(operIdx));
+
+ // If duplicates exists in the dimension list then exception is
+ // thrown
+ checkDuplicateProject(lexpPlanList);
+
+ // Construct ProjectExpression from the LogicalExpressionPlans
+ lexpList = getProjectExpList(lexpPlanList, gen);
+
+ for (int i = 0; i < lexpList.size(); i++) {
+ // Retain the columns that needs to be pushed down.
+ // Remove the dimension columns from the input column list
+ // as it will be attached to CubeDimension UDF
+ for (int j = 0; j < allExprPlan.size(); j++) {
+ LogicalExpression lexp = (LogicalExpression) allExprPlan.get(j).getSources()
+ .get(0);
+ String colAlias = ((ProjectExpression) lexpList.get(i)).getColAlias();
+ if (colAlias == null) {
+ colAlias = ((ProjectExpression) lexpList.get(i)).getFieldSchema().alias;
+ }
+
+ String projExpAlias = null;
+ try {
+ projExpAlias = ((ProjectExpression) lexp).getColAlias();
+ } catch (ClassCastException e) {
+ // if it is not projection then it should be
+ // UserFuncExpr.
+ // ignore and continue till next ProjExpr is encountered
+ continue;
+ }
+ if (colAlias.equals(projExpAlias) == true) {
+ allExprPlan.remove(j);
+ } else {
+ // if projected exp alias is a namespaced alias
+ if (projExpAlias.lastIndexOf(":") != -1) {
+ projExpAlias = projExpAlias.substring(
+ projExpAlias.lastIndexOf(":") + 1, projExpAlias.length());
+ if (colAlias.equals(projExpAlias) == true) {
+ allExprPlan.remove(j);
+ }
+ }
+ }
+ }
+ }
+
+ // Create UDF with user specified dimensions
+ LogicalExpressionPlan uexpPlan = new LogicalExpressionPlan();
+ if (operations.get(operIdx).equals("CUBE")) {
+ new UserFuncExpression(uexpPlan, new FuncSpec(CubeDimensions.class.getName()),
+ lexpList);
+ } else {
+ new UserFuncExpression(uexpPlan, new FuncSpec(RollupDimensions.class.getName()),
+ lexpList);
+ }
- LOForEach foreach = new LOForEach(plan);
- LOCogroup groupby = new LOCogroup(plan);
- LogicalPlan innerPlan = new LogicalPlan();
- LogicalRelationalOperator gen = new LOGenerate(innerPlan);
-
- injectForeachOperator(loc, op, foreach);
-
- // Get all column attributes from the input relation.
- // Create ProjectExpression for all columns. Based on the
- // dimensions specified by the user, specified columns will be attached
- // to CubeDimension/RollupDimension UDF and rest will be pushed down
- List<Operator> inpOpers = foreach.getPlan().getPredecessors(foreach);
- List<LogicalExpressionPlan> allExprPlan = new ArrayList<LogicalExpressionPlan>();
- for (Operator oper : inpOpers) {
- LogicalSchema schema = new LogicalSchema();
- schema = ((LogicalRelationalOperator) oper).getSchema();
-
- if (schema != null) {
- ArrayList<LogicalFieldSchema> fields = (ArrayList<LogicalFieldSchema>) schema
- .getFields();
- for (int i = 0; i < fields.size(); i++) {
- LogicalExpressionPlan lEplan = new LogicalExpressionPlan();
- new ProjectExpression(lEplan, i, fields.get(i).alias, null, gen);
- allExprPlan.add(lEplan);
- }
- }
- }
-
- // iterate over all operations and generate corresponding UDFs
- for (int operIdx = 0; operIdx < operations.size(); operIdx++) {
- List<LogicalExpressionPlan> lexpPlanList = new ArrayList<LogicalExpressionPlan>();
- List<LogicalExpression> lexpList = new ArrayList<LogicalExpression>();
-
- lexpPlanList.addAll(expressionPlans.get(operIdx));
-
- // If duplicates exists in the dimension list then exception is
- // thrown
- checkDuplicateProject(lexpPlanList);
-
- // Construct ProjectExpression from the LogicalExpressionPlans
- lexpList = getProjectExpList(lexpPlanList, gen);
-
- for (int i = 0; i < lexpList.size(); i++) {
- // Retain the columns that needs to be pushed down.
- // Remove the dimension columns from the input column list
- // as it will be attached to CubeDimension UDF
- for (int j = 0; j < allExprPlan.size(); j++) {
- LogicalExpression lexp = (LogicalExpression) allExprPlan.get(j).getSources()
- .get(0);
- String colAlias = ((ProjectExpression) lexpList.get(i)).getColAlias();
- if (colAlias == null) {
- colAlias = ((ProjectExpression) lexpList.get(i)).getFieldSchema().alias;
- }
-
- String projExpAlias = null;
- try {
- projExpAlias = ((ProjectExpression) lexp).getColAlias();
- } catch (ClassCastException e) {
- // if it is not projection then it should be
- // UserFuncExpr.
- // ignore and continue till next ProjExpr is encountered
- continue;
- }
- if (colAlias.equals(projExpAlias) == true) {
- allExprPlan.remove(j);
- } else {
- // if projected exp alias is a namespaced alias
- if (projExpAlias.lastIndexOf(":") != -1) {
- projExpAlias = projExpAlias.substring(
- projExpAlias.lastIndexOf(":") + 1, projExpAlias.length());
- if (colAlias.equals(projExpAlias) == true) {
- allExprPlan.remove(j);
- }
- }
- }
- }
- }
-
- // Create UDF with user specified dimensions
- LogicalExpressionPlan uexpPlan = new LogicalExpressionPlan();
- if (operations.get(operIdx).equals("CUBE")) {
- new UserFuncExpression(uexpPlan, new FuncSpec(CubeDimensions.class.getName()),
- lexpList);
- } else {
- new UserFuncExpression(uexpPlan, new FuncSpec(RollupDimensions.class.getName()),
- lexpList);
- }
-
- for (LogicalExpressionPlan lexp : lexpPlanList) {
- Iterator<Operator> it = lexp.getOperators();
- while (it.hasNext()) {
- uexpPlan.add(it.next());
- }
- }
- // Add the UDF to logical expression plan that contains dependent
- // attributes (pushed down from input columns)
- allExprPlan.add(operIdx, uexpPlan);
- }
-
- // If the operator is a UserFuncExpression then set the flatten flags.
- List<Boolean> flattenFlags = new ArrayList<Boolean>();
- for (int idx = 0; idx < allExprPlan.size(); idx++) {
- List<Operator> opers = allExprPlan.get(idx).getSources();
- for (Operator oper : opers) {
- if (oper instanceof ProjectExpression) {
- flattenFlags.add(false);
- } else if (oper instanceof UserFuncExpression) {
- flattenFlags.add(true);
- }
- }
- }
-
- // Generate and Foreach operator creation
- String falias = null;
- try {
- buildGenerateOp(loc, (LOForEach) foreach, (LOGenerate) gen, allExprPlan,
- flattenFlags, getUserDefinedSchema(allExprPlan));
- falias = buildForeachOp(loc, (LOForEach) foreach, "cube", inputAlias, innerPlan);
- } catch (ParserValidationException pve) {
- throw new FrontendException(pve);
- }
-
- List<Boolean> innerFlags = new ArrayList<Boolean>();
- List<String> inpAliases = new ArrayList<String>();
- inpAliases.add(falias);
- innerFlags.add(false);
-
- // Get the output schema of foreach operator and reconstruct the
- // LogicalExpressionPlan for each dimensional attributes
- MultiMap<Integer, LogicalExpressionPlan> exprPlansCopy = new MultiMap<Integer, LogicalExpressionPlan>();
-
- for (LogicalExpressionPlan exp : expressionPlans.values()) {
- LogicalExpression lexp = (LogicalExpression) exp.getSources().get(0);
- LogicalExpressionPlan epGrp = new LogicalExpressionPlan();
- new ProjectExpression(epGrp, 0, lexp.getFieldSchema().alias, null, groupby);
- exprPlansCopy.put(0, epGrp);
- }
-
- // build group by operator
- try {
- return buildGroupOp(loc, (LOCogroup) groupby, op.getAlias(), inpAliases, exprPlansCopy,
- GROUPTYPE.REGULAR, innerFlags, null);
- } catch (ParserValidationException pve) {
- throw new FrontendException(pve);
- }
+ for (LogicalExpressionPlan lexp : lexpPlanList) {
+ Iterator<Operator> it = lexp.getOperators();
+ while (it.hasNext()) {
+ uexpPlan.add(it.next());
+ }
+ }
+ // Add the UDF to logical expression plan that contains dependent
+ // attributes (pushed down from input columns)
+ allExprPlan.add(operIdx, uexpPlan);
+ }
+
+ // If the operator is a UserFuncExpression then set the flatten flags.
+ List<Boolean> flattenFlags = new ArrayList<Boolean>();
+ for (int idx = 0; idx < allExprPlan.size(); idx++) {
+ List<Operator> opers = allExprPlan.get(idx).getSources();
+ for (Operator oper : opers) {
+ if (oper instanceof ProjectExpression) {
+ flattenFlags.add(false);
+ } else if (oper instanceof UserFuncExpression) {
+ flattenFlags.add(true);
+ }
+ }
+ }
+
+ // Generate and Foreach operator creation
+ String falias = null;
+ try {
+ buildGenerateOp(loc, (LOForEach) foreach, (LOGenerate) gen, allExprPlan,
+ flattenFlags, getUserDefinedSchema(allExprPlan));
+ falias = buildForeachOp(loc, (LOForEach) foreach, "cube", inputAlias, innerPlan);
+ } catch (ParserValidationException pve) {
+ throw new FrontendException(pve);
+ }
+
+ List<Boolean> innerFlags = new ArrayList<Boolean>();
+ List<String> inpAliases = new ArrayList<String>();
+ inpAliases.add(falias);
+ innerFlags.add(false);
+
+ // Get the output schema of foreach operator and reconstruct the
+ // LogicalExpressionPlan for each dimensional attributes
+ MultiMap<Integer, LogicalExpressionPlan> exprPlansCopy = new MultiMap<Integer, LogicalExpressionPlan>();
+
+ for (LogicalExpressionPlan exp : expressionPlans.values()) {
+ LogicalExpression lexp = (LogicalExpression) exp.getSources().get(0);
+ LogicalExpressionPlan epGrp = new LogicalExpressionPlan();
+ new ProjectExpression(epGrp, 0, lexp.getFieldSchema().alias, null, groupby);
+ exprPlansCopy.put(0, epGrp);
+ }
+
+ // build group by operator
+ try {
+ return buildGroupOp(loc, (LOCogroup) groupby, op.getAlias(), inpAliases, exprPlansCopy,
+ GROUPTYPE.REGULAR, innerFlags, null);
+ } catch (ParserValidationException pve) {
+ throw new FrontendException(pve);
+ }
}
// User defined schema for generate operator. If not specified output schema
// of UDF will be used which will prefix "dimensions" namespace to all fields
private List<LogicalSchema> getUserDefinedSchema(List<LogicalExpressionPlan> allExprPlan)
- throws FrontendException {
- List<LogicalSchema> genOutputSchema = new ArrayList<LogicalSchema>();
- for (int i = 0; i < allExprPlan.size(); i++) {
- List<Operator> opers = allExprPlan.get(i).getSources();
- for (Operator oper : opers) {
-
- // add a logical schema for dimensions that are pushed from
- // predecessor of cube/rollup
- if (oper instanceof ProjectExpression) {
- LogicalSchema output = new LogicalSchema();
- output.addField(new LogicalFieldSchema(
- ((ProjectExpression) oper).getColAlias(), null, DataType.NULL));
- genOutputSchema.add(output);
- } else if (oper instanceof UserFuncExpression) {
- // add logical schema for dimensions specified in
- // cube/rollup operator
- LogicalSchema output = new LogicalSchema();
- for (Operator op : ((UserFuncExpression) oper).getPlan().getSinks()) {
- output.addField(new LogicalFieldSchema(((ProjectExpression) op)
- .getFieldSchema()));
- }
- genOutputSchema.add(output);
- }
-
- }
- }
- return genOutputSchema;
+ throws FrontendException {
+
+ List<LogicalSchema> genOutputSchema = new ArrayList<LogicalSchema>();
+ for (int i = 0; i < allExprPlan.size(); i++) {
+ List<Operator> opers = allExprPlan.get(i).getSources();
+ for (Operator oper : opers) {
+
+ // add a logical schema for dimensions that are pushed from
+ // predecessor of cube/rollup
+ if (oper instanceof ProjectExpression) {
+ LogicalSchema output = new LogicalSchema();
+ output.addField(new LogicalFieldSchema(
+ ((ProjectExpression) oper).getColAlias(), null, DataType.NULL));
+ genOutputSchema.add(output);
+ } else if (oper instanceof UserFuncExpression) {
+ // add logical schema for dimensions specified in
+ // cube/rollup operator
+ LogicalSchema output = new LogicalSchema();
+ for (Operator op : ((UserFuncExpression) oper).getPlan().getSinks()) {
+ output.addField(new LogicalFieldSchema(((ProjectExpression) op)
+ .getFieldSchema()));
+ }
+ genOutputSchema.add(output);
+ }
+
+ }
+ }
+ return genOutputSchema;
}
private List<LogicalExpression> getProjectExpList(List<LogicalExpressionPlan> lexpPlanList,
- LogicalRelationalOperator lro) throws FrontendException {
+ LogicalRelationalOperator lro) throws FrontendException {
- List<LogicalExpression> leList = new ArrayList<LogicalExpression>();
- for (int i = 0; i < lexpPlanList.size(); i++) {
- LogicalExpressionPlan lexp = lexpPlanList.get(i);
- LogicalExpression lex = (LogicalExpression) lexp.getSources().get(0);
- Iterator<Operator> opers = lexp.getOperators();
-
- // ProjExpr are initially attached to CubeOp. So re-attach it to
- // specified operator
- while (opers.hasNext()) {
- Operator oper = opers.next();
- try {
- ((ProjectExpression) oper).setAttachedRelationalOp(lro);
- } catch (ClassCastException cce) {
- throw new FrontendException("Column project expected.", cce);
- }
- }
+ List<LogicalExpression> leList = new ArrayList<LogicalExpression>();
+ for (int i = 0; i < lexpPlanList.size(); i++) {
+ LogicalExpressionPlan lexp = lexpPlanList.get(i);
+ LogicalExpression lex = (LogicalExpression) lexp.getSources().get(0);
+ Iterator<Operator> opers = lexp.getOperators();
+
+ // ProjExpr are initially attached to CubeOp. So re-attach it to
+ // specified operator
+ while (opers.hasNext()) {
+ Operator oper = opers.next();
+ try {
+ ((ProjectExpression) oper).setAttachedRelationalOp(lro);
+ } catch (ClassCastException cce) {
+ throw new FrontendException("Column project expected.", cce);
+ }
+ }
- leList.add(lex);
- }
+ leList.add(lex);
+ }
- return leList;
+ return leList;
}
// This method connects the predecessors of cube operator with foreach
// operator and disconnects the cube operator from its predecessors
private void injectForeachOperator(SourceLocation loc, LOCube op, LOForEach foreach)
- throws FrontendException {
- // connect the foreach operator with predecessors of cube operator
- List<Operator> opers = op.getPlan().getPredecessors(op);
- for (Operator oper : opers) {
- OperatorPlan foreachPlan = foreach.getPlan();
- foreachPlan.connect(oper, (Operator) foreach);
- }
-
- // disconnect the cube operator from the plan
- opers = foreach.getPlan().getPredecessors(foreach);
- for (Operator lop : opers) {
- List<Operator> succs = lop.getPlan().getSuccessors(lop);
- for (Operator succ : succs) {
- if (succ instanceof LOCube) {
- succ.getPlan().disconnect(lop, succ);
- succ.getPlan().remove(succ);
- }
- }
- }
+ throws FrontendException {
+ // connect the foreach operator with predecessors of cube operator
+ List<Operator> opers = op.getPlan().getPredecessors(op);
+ for (Operator oper : opers) {
+ OperatorPlan foreachPlan = foreach.getPlan();
+ foreachPlan.connect(oper, (Operator) foreach);
+ }
+
+ // disconnect the cube operator from the plan
+ opers = foreach.getPlan().getPredecessors(foreach);
+ for (Operator lop : opers) {
+ List<Operator> succs = lop.getPlan().getSuccessors(lop);
+ for (Operator succ : succs) {
+ if (succ instanceof LOCube) {
+ succ.getPlan().disconnect(lop, succ);
+ succ.getPlan().remove(succ);
+ }
+ }
+ }
}
// This methods if the dimensions specified by the user has duplicates
private void checkDuplicateProject(List<LogicalExpressionPlan> lExprPlan)
- throws FrontendException {
+ throws FrontendException {
+
+ for (int i = 0; i < lExprPlan.size(); i++) {
+ for (int j = i + 1; j < lExprPlan.size(); j++) {
+ LogicalExpression outer = (LogicalExpression) lExprPlan.get(i).getSources().get(0);
+ LogicalExpression inner = (LogicalExpression) lExprPlan.get(j).getSources().get(0);
+ String outColAlias = ((ProjectExpression) outer).getColAlias();
+ String inColAlias = ((ProjectExpression) inner).getColAlias();
- for (int i = 0; i < lExprPlan.size(); i++) {
- for (int j = i + 1; j < lExprPlan.size(); j++) {
- LogicalExpression outer = (LogicalExpression) lExprPlan.get(i).getSources().get(0);
- LogicalExpression inner = (LogicalExpression) lExprPlan.get(j).getSources().get(0);
- String outColAlias = ((ProjectExpression) outer).getColAlias();
- String inColAlias = ((ProjectExpression) inner).getColAlias();
-
- if (outColAlias == null) {
- outColAlias = outer.getFieldSchema().alias;
- }
-
- if (inColAlias == null) {
- inColAlias = inner.getFieldSchema().alias;
- }
-
- if (outColAlias.equals(inColAlias) == true) {
- lExprPlan.remove(j);
- throw new FrontendException("Duplicate dimensions detected. Dimension name: "
- + inColAlias);
- }
- }
- }
+ if (outColAlias == null) {
+ outColAlias = outer.getFieldSchema().alias;
+ }
+
+ if (inColAlias == null) {
+ inColAlias = inner.getFieldSchema().alias;
+ }
+ if (outColAlias.equals(inColAlias) == true) {
+ lExprPlan.remove(j);
+ throw new FrontendException("Duplicate dimensions detected. Dimension name: "
+ + inColAlias);
+ }
+ }
+ }
}
LOCogroup createGroupOp() {
@@ -891,39 +890,63 @@ public class LogicalPlanBuilder {
loFunc,
alias + "_" + newOperatorKey());
op.setTmpLoad(false);
- return buildOp( loc, op, alias, new ArrayList<String>(), null );
+
+ // Check if there's a store in the plan already that this load
+ // depends on. If so, add it as an input alias
+ List<String> inputAliases = new ArrayList<String>();
+
+ // Get list of stores. The stores are not all sinks in the plan
+ // if they've already got successors.
+ Iterator<Operator> itr = plan.getOperators();
+ List<LOStore> stores = new ArrayList<LOStore>();
+ while (itr.hasNext()) {
+ Operator lop = itr.next();
+ if (lop instanceof LOStore) {
+ stores.add((LOStore)lop);
+ }
+ }
+
+ for (LOStore store : stores) {
+ String ifile = op.getFileSpec().getFileName();
+ String ofile = store.getFileSpec().getFileName();
+ if (ofile.equals(ifile)) {
+ inputAliases.add( store.getAlias() );
+ }
+ }
+
+ return buildOp( loc, op, alias, inputAliases, null );
}
private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
- String inputAlias, String partitioner) throws ParserValidationException {
+ String inputAlias, String partitioner) throws ParserValidationException {
List<String> inputAliases = new ArrayList<String>();
if( inputAlias != null )
inputAliases.add( inputAlias );
return buildOp( loc, op, alias, inputAliases, partitioner );
}
-
- private void checkDuplicateAliases(List<String> inputAliases, SourceLocation loc,
- String opName) throws ParserValidationException {
- //Keep the count of the number of times the same Alias is used
- Map<Operator, Integer> inputAliasesMap = new HashMap<Operator, Integer>();
+
+ private void checkDuplicateAliases(List<String> inputAliases, SourceLocation loc,
+ String opName) throws ParserValidationException {
+ //Keep the count of the number of times the same Alias is used
+ Map<Operator, Integer> inputAliasesMap = new HashMap<Operator, Integer>();
for(String a : inputAliases) {
- Operator pred = operators.get( a );
- if (pred == null) {
- throw new ParserValidationException( intStream, loc, "Unrecognized alias " + a );
- }
- if (inputAliasesMap.containsKey(pred)) {
- throw new ParserValidationException( intStream, loc,
- "Pig does not accept same alias as input for " + opName +
- " operation : " + a );
- }
- else {
+ Operator pred = operators.get( a );
+ if (pred == null) {
+ throw new ParserValidationException( intStream, loc, "Unrecognized alias " + a );
+ }
+ if (inputAliasesMap.containsKey(pred)) {
+ throw new ParserValidationException( intStream, loc,
+ "Pig does not accept same alias as input for " + opName +
+ " operation : " + a );
+ }
+ else {
inputAliasesMap.put(pred, 1);
}
}
}
-
+
private String buildOp(SourceLocation loc, LogicalRelationalOperator op, String alias,
- List<String> inputAliases, String partitioner) throws ParserValidationException {
+ List<String> inputAliases, String partitioner) throws ParserValidationException {
setAlias( op, alias );
setPartitioner( op, partitioner );
op.setLocation( loc );
@@ -944,7 +967,8 @@ public class LogicalPlanBuilder {
throws ParserValidationException {
try {
// Load StoreFunc class from default properties if funcSpec is null. Fallback on PigStorage if StoreFunc is not specified in properties.
- funcSpec = funcSpec == null ? new FuncSpec(pigContext.getProperties().getProperty(PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName())) : funcSpec;
+ funcSpec = funcSpec == null ? new FuncSpec(pigContext.getProperties().getProperty(
+ PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName())) : funcSpec;
StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(funcSpec);
String fileNameKey = inputAlias + "_" + (storeIndex++) ;
@@ -969,10 +993,10 @@ public class LogicalPlanBuilder {
throw new ParserValidationException(intStream, loc, ex);
}
}
-
- String buildAssertOp(SourceLocation loc, LOFilter filterOp,
+
+ String buildAssertOp(SourceLocation loc, LOFilter filterOp,
String alias, String inputAlias, LogicalExpression expr, String comment,
- LogicalExpressionPlan exprPlan)
+ LogicalExpressionPlan exprPlan)
throws ParserValidationException {
try {
filterOp.setAlias(inputAlias);
@@ -1344,7 +1368,7 @@ public class LogicalPlanBuilder {
* @throws RecognitionException
*/
LogicalExpression buildProjectExpr(SourceLocation loc, LogicalExpressionPlan plan, LogicalRelationalOperator op,
- Map<String, Operator> operators, Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col)
+ Map<String, Operator> operators, Map<String, LogicalExpressionPlan> exprPlans, String colAlias, int col)
throws RecognitionException {
ProjectExpression result = null;
@@ -1478,7 +1502,6 @@ public class LogicalPlanBuilder {
throw new ParserValidationException(intStream, loc, e);
}
-
return proj;
}
@@ -1633,7 +1656,7 @@ public class LogicalPlanBuilder {
}
static LOForEach createNestedForeachOp(LogicalPlan plan) {
- return new LOForEach(plan);
+ return new LOForEach(plan);
}
Operator buildNestedSortOp(SourceLocation loc, LOSort op, LogicalPlan plan, String alias, Operator inputOp,
@@ -1651,12 +1674,11 @@ public class LogicalPlanBuilder {
}
Operator buildNestedForeachOp(SourceLocation loc, LOForEach op, LogicalPlan plan, String alias,
- Operator inputOp, LogicalPlan innerPlan)
- throws ParserValidationException
- {
- op.setInnerPlan(innerPlan);
- buildNestedOp(loc, plan, op, alias, inputOp);
- return op;
+ Operator inputOp, LogicalPlan innerPlan)
+ throws ParserValidationException {
+ op.setInnerPlan(innerPlan);
+ buildNestedOp(loc, plan, op, alias, inputOp);
+ return op;
}
Operator buildNestedProjectOp(SourceLocation loc, LogicalPlan innerPlan, LOForEach foreach,
@@ -1740,19 +1762,19 @@ public class LogicalPlanBuilder {
String modifier = unquote( hint );
if( modifier.equalsIgnoreCase( "repl" ) || modifier.equalsIgnoreCase( "replicated" ) ) {
- return JOINTYPE.REPLICATED;
- } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
- return LOJoin.JOINTYPE.HASH;
- } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
- return JOINTYPE.SKEWED;
- } else if (modifier.equalsIgnoreCase("merge")) {
- return JOINTYPE.MERGE;
- } else if (modifier.equalsIgnoreCase("merge-sparse")) {
- return JOINTYPE.MERGESPARSE;
- } else {
- throw new ParserValidationException( intStream, loc,
+ return JOINTYPE.REPLICATED;
+ } else if( modifier.equalsIgnoreCase( "hash" ) || modifier.equalsIgnoreCase( "default" ) ) {
+ return LOJoin.JOINTYPE.HASH;
+ } else if( modifier.equalsIgnoreCase( "skewed" ) ) {
+ return JOINTYPE.SKEWED;
+ } else if (modifier.equalsIgnoreCase("merge")) {
+ return JOINTYPE.MERGE;
+ } else if (modifier.equalsIgnoreCase("merge-sparse")) {
+ return JOINTYPE.MERGESPARSE;
+ } else {
+ throw new ParserValidationException( intStream, loc,
"Only REPL, REPLICATED, HASH, SKEWED, MERGE, and MERGE-SPARSE are vaild JOIN modifiers." );
- }
+ }
}
void putOperator(String alias, Operator op) {
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Sat May 10 22:44:36 2014
@@ -37,6 +37,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.apache.pig.builtin.mock.Storage;
@RunWith(JUnit4.class)
public class TestMultiQuery {
@@ -53,14 +54,14 @@ public class TestMultiQuery {
props.setProperty("opt.multiquery", ""+true);
myPig = new PigServer(ExecType.LOCAL, props);
}
-
+
@AfterClass
public static void tearDownAfterClass() throws Exception {
Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd");
Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd2");
deleteOutputFiles();
}
-
+
@Before
public void setUp() throws Exception {
deleteOutputFiles();
@@ -75,9 +76,9 @@ public class TestMultiQuery {
public void testMultiQueryJiraPig1438() throws Exception {
// test case: merge multiple distinct jobs
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\t2\t3",
"2\t3\t4",
@@ -85,9 +86,9 @@ public class TestMultiQuery {
"2\t3\t4",
"1\t2\t3"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
-
+
myPig.setBatchOn();
myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
@@ -98,83 +99,83 @@ public class TestMultiQuery {
myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
myPig.registerQuery("store D1 into 'output1';");
- myPig.registerQuery("store D2 into 'output2';");
-
+ myPig.registerQuery("store D2 into 'output2';");
+
myPig.executeBatch();
-
- myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
+
+ myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
Iterator<Tuple> iter = myPig.openIterator("E");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(1,2)",
"(2,3)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
-
- myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
+
+ myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
iter = myPig.openIterator("E");
expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(2,3)",
"(3,4)"
});
-
+
counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
}
-
+
@Test
public void testMultiQueryJiraPig1252() throws Exception {
// test case: Problems with secondary key optimization and multiquery
// diamond optimization
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\t2\t3",
"2\t3\t4",
"3\t\t5",
"5\t6\t6",
- "6\t\t7"
+ "6\t\t7"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1, col2, col3);");
myPig.registerQuery("B = foreach A generate (chararray) col1, " +
- "(chararray) ((col2 is not null) ? " +
- "col2 : (col3 < 6 ? col3 : '')) as splitcond;");
+ "(chararray) ((col2 is not null) ? " +
+ "col2 : (col3 < 6 ? col3 : '')) as splitcond;");
myPig.registerQuery("split B into C if splitcond != '', D if splitcond == '';");
myPig.registerQuery("E = group C by splitcond;");
myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
-
+
Iterator<Tuple> iter = myPig.openIterator("F");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(1,2)",
"(2,3)",
"(3,5)",
"(5,6)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
@@ -184,22 +185,22 @@ public class TestMultiQuery {
public void testMultiQueryJiraPig1169() throws Exception {
// test case: Problems with some top N queries
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\t2\t3",
"2\t3\t4",
"3\t4\t5",
"5\t6\t7",
- "6\t7\t8"
+ "6\t7\t8"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
-
+
myPig.setBatchOn();
- myPig.registerQuery("A = load '" + INPUT_FILE
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ "' as (a:int, b, c);");
myPig.registerQuery("A1 = Order A by a desc parallel 3;");
myPig.registerQuery("A2 = limit A1 2;");
@@ -209,105 +210,105 @@ public class TestMultiQuery {
myPig.executeBatch();
myPig.registerQuery("B = load 'output2' as (a:int, b, c);");
-
+
Iterator<Tuple> iter = myPig.openIterator("B");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(6,7,8)",
"(5,6,7)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
}
-
+
@Test
public void testMultiQueryJiraPig1171() throws Exception {
// test case: Problems with some top N queries
-
+
String INPUT_FILE = "abc";
-
+
String[] inputData = {
"1\tapple\t3",
"2\torange\t4",
- "3\tpersimmon\t5"
+ "3\tpersimmon\t5"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
- myPig.registerQuery("A = load '" + INPUT_FILE
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ "' as (a:long, b, c);");
myPig.registerQuery("A1 = Order A by a desc;");
myPig.registerQuery("A2 = limit A1 1;");
- myPig.registerQuery("B = load '" + INPUT_FILE
+ myPig.registerQuery("B = load '" + INPUT_FILE
+ "' as (a:long, b, c);");
myPig.registerQuery("B1 = Order B by a desc;");
myPig.registerQuery("B2 = limit B1 1;");
-
+
myPig.registerQuery("C = cross A2, B2;");
-
+
Iterator<Tuple> iter = myPig.openIterator("C");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(3L,'persimmon',5,3L,'persimmon',5)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
}
-
+
@Test
public void testMultiQueryJiraPig1157() throws Exception {
// test case: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM
-
+
String INPUT_FILE = "abc";
String INPUT_FILE_1 = "abc";
-
+
String[] inputData = {
"1\tapple\t3",
"2\torange\t4",
- "3\tpersimmon\t5"
+ "3\tpersimmon\t5"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
- myPig.registerQuery("A = load '" + INPUT_FILE
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ "' as (a:long, b, c);");
myPig.registerQuery("A1 = FOREACH A GENERATE a;");
myPig.registerQuery("B = GROUP A1 BY a;");
- myPig.registerQuery("C = load '" + INPUT_FILE_1
+ myPig.registerQuery("C = load '" + INPUT_FILE_1
+ "' as (x:long, y);");
- myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
- myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
-
+ myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
+ myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
+
Iterator<Tuple> iter = myPig.openIterator("E");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"(1L,'apple',3,1L,'apple',1L,{(1L)})",
"(2L,'orange',4,2L,'orange',2L,{(2L)})",
"(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
@@ -316,7 +317,7 @@ public class TestMultiQuery {
@Test
public void testMultiQueryJiraPig1068() throws Exception {
- // test case: COGROUP fails with 'Type mismatch in key from map:
+ // test case: COGROUP fails with 'Type mismatch in key from map:
// expected org.apache.pig.impl.io.NullableText, recieved org.apache.pig.impl.io.NullableTuple'
String INPUT_FILE = "pig-1068.txt";
@@ -324,36 +325,36 @@ public class TestMultiQuery {
String[] inputData = {
"10\tapple\tlogin\tjar",
"20\torange\tlogin\tbox",
- "30\tstrawberry\tquit\tbot"
+ "30\tstrawberry\tquit\tbot"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
- myPig.registerQuery("logs = load '" + INPUT_FILE
+ myPig.registerQuery("logs = load '" + INPUT_FILE
+ "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
- myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
+ myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
+ "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
-
+
Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"('apple',{},{('apple','jar',1L)})",
"('orange',{},{('orange','box',1L)})",
"('strawberry',{(30,'strawberry','quit','bot')},{})"
});
-
+
int counter = 0;
while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
@@ -364,22 +365,22 @@ public class TestMultiQuery {
myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' "
+ myPig.registerQuery("a = load 'passwd' "
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
myPig.registerQuery("b = group plan1 by uname;");
- myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
+ myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
+ "generate flatten(group) as foo, tmp; };");
myPig.registerQuery("d = filter c BY foo is not null;");
myPig.registerQuery("store d into 'output1';");
myPig.registerQuery("store plan2 into 'output2';");
-
+
List<ExecJob> jobs = myPig.executeBatch();
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
-
+ }
+
@Test
public void testMultiQueryJiraPig1114() throws Exception {
@@ -390,9 +391,9 @@ public class TestMultiQuery {
String[] inputData = {
"10\tjar",
"20\tbox",
- "30\tbot"
+ "30\tbot"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
@@ -433,39 +434,39 @@ public class TestMultiQuery {
String INPUT_FILE_1 = "set1.txt";
String INPUT_FILE_2 = "set2.txt";
-
+
String[] inputData_1 = {
"login\t0\tjar",
"login\t1\tbox",
- "quit\t0\tmany"
+ "quit\t0\tmany"
};
-
+
Util.createLocalInputFile(INPUT_FILE_1, inputData_1);
-
+
String[] inputData_2 = {
"apple\tlogin\t{(login)}",
"orange\tlogin\t{(login)}",
- "strawberry\tquit\t{(login)}"
+ "strawberry\tquit\t{(login)}"
};
-
+
Util.createLocalInputFile(INPUT_FILE_2, inputData_2);
-
+
myPig.setBatchOn();
- myPig.registerQuery("set1 = load '" + INPUT_FILE_1
+ myPig.registerQuery("set1 = load '" + INPUT_FILE_1
+ "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
myPig.registerQuery("set2 = load '" + INPUT_FILE_2
+ "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
- myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
+ myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
+ "(chararray) 0 as f3;");
myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
- + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
+ + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
-
+
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"('quit','0','many','strawberry','quit','0')",
"('login','0','jar','apple','login','0')",
"('login','0','jar','orange','login','0')",
@@ -473,7 +474,7 @@ public class TestMultiQuery {
"('login','1','box','orange','login','1')",
"('login','1','box','strawberry','login','1')"
});
-
+
Iterator<Tuple> iter = myPig.openIterator("joined_sets");
int count = 0;
while (iter.hasNext()) {
@@ -481,11 +482,11 @@ public class TestMultiQuery {
}
assertEquals(expectedResults.size(), count);
}
-
+
@Test
public void testMultiQueryJiraPig1060_2() throws Exception {
- // test case:
+ // test case:
String INPUT_FILE = "pig-1060.txt";
@@ -495,9 +496,9 @@ public class TestMultiQuery {
"orange\t3",
"orange\t23",
"strawberry\t10",
- "strawberry\t34"
+ "strawberry\t34"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
@@ -530,7 +531,7 @@ public class TestMultiQuery {
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
+ }
@Test
public void testMultiQueryJiraPig920_2() throws Exception {
@@ -551,27 +552,27 @@ public class TestMultiQuery {
myPig.registerQuery("g = cogroup d by $0, e by $0;");
myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
myPig.registerQuery("store g1 into 'output2';");
-
+
List<ExecJob> jobs = myPig.executeBatch();
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
-
+ }
+
@Test
public void testMultiQueryJiraPig920_3() throws Exception {
// test case: execution of a simple diamond query
-
+
String INPUT_FILE = "pig-920.txt";
-
+
String[] inputData = {
"apple\tapple\t100\t10",
"apple\tapple\t200\t20",
"orange\torange\t100\t10",
- "orange\torange\t300\t20"
+ "orange\torange\t300\t20"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
myPig.setBatchOn();
@@ -582,22 +583,22 @@ public class TestMultiQuery {
myPig.registerQuery("c = filter a by gid > 10;");
myPig.registerQuery("d = cogroup c by $0, b by $0;");
myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
-
+
Iterator<Tuple> iter = myPig.openIterator("e");
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
+ new String[] {
"('apple',1L,2L)",
"('orange',1L,1L)"
});
-
+
int counter = 0;
while (iter.hasNext()) {
assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
assertEquals(expectedResults.size(), counter);
- }
+ }
@Test
public void testMultiQueryJiraPig976() throws Exception {
@@ -625,7 +626,7 @@ public class TestMultiQuery {
@Test
public void testMultiQueryJiraPig976_2() throws Exception {
- // test case: key ('group') isn't part of foreach output
+ // test case: key ('group') isn't part of foreach output
// and keys have different types
myPig.setBatchOn();
@@ -671,7 +672,7 @@ public class TestMultiQuery {
public void testMultiQueryJiraPig976_4() throws Exception {
// test case: group by multi-cols and key ('group') isn't part of output
-
+
myPig.setBatchOn();
myPig.registerQuery("a = load 'passwd' " +
@@ -688,7 +689,7 @@ public class TestMultiQuery {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
}
-
+
@Test
public void testMultiQueryJiraPig976_5() throws Exception {
@@ -718,7 +719,7 @@ public class TestMultiQuery {
// test case: key ('group') has null values.
String INPUT_FILE = "pig-976.txt";
-
+
String[] inputData = {
"apple\tapple\t100\t10",
"apple\tapple\t\t20",
@@ -726,9 +727,9 @@ public class TestMultiQuery {
"orange\torange\t\t20",
"strawberry\tstrawberry\t300\t10"
};
-
+
Util.createLocalInputFile(INPUT_FILE, inputData);
-
+
myPig.setBatchOn();
myPig.registerQuery("a = load '" + INPUT_FILE +
@@ -742,11 +743,11 @@ public class TestMultiQuery {
List<ExecJob> jobs = myPig.executeBatch();
assertTrue(jobs.size() == 2);
-
+
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
+ }
@Test
public void testMultiQueryJiraPig983_2() throws Exception {
@@ -766,15 +767,50 @@ public class TestMultiQuery {
myPig.registerQuery("f = group d by c::gid;");
myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
myPig.registerQuery("store f1 into 'output2';");
-
+
List<ExecJob> jobs = myPig.executeBatch();
assertTrue(jobs.size() == 2);
-
+
for (ExecJob job : jobs) {
assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
- }
+ }
+
+ /**
+ This test will fail indeterministically without the PIG-3902 patch. Sometimes
+ the loads will get attached as dependencies to the appropriate stores (during
+ the postProcess() method on PigServer.Graph) and the dag will successfully
+ run, while other times it will fail (if the loads get attached as dependencies
+ of the incorrect stores).
+ */
+ @Test
+ public void testMultiQueryJiraPig3902() throws Exception {
+
+ // test case: Pig Server creates implicit cycle when
+ // loading and storing from same location with an
+ // intermediate store.
+
+ Storage.Data data = Storage.resetData(myPig);
+ data.set("inputLocation", Storage.tuple(1,2,3));
+
+ myPig.setBatchOn();
+ myPig.registerQuery("A = load 'inputLocation' using mock.Storage() as (a:int, b, c);");
+ myPig.registerQuery("A1 = order A by a desc parallel 3;");
+ myPig.registerQuery("A2 = limit A1 2;");
+ myPig.registerQuery("store A1 into 'output1' using mock.Storage();");
+ myPig.registerQuery("A3 = load 'output1' using mock.Storage()as (a:int, b, c);");
+ myPig.registerQuery("A4 = filter A3 by a > 2;");
+ myPig.registerQuery("store A4 into 'inputLocation' using mock.Storage();");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
+ }
// --------------------------------------------------------------------------
// Helper methods
Modified: pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1593741&r1=1593740&r2=1593741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Sat May 10 22:44:36 2014
@@ -162,10 +162,6 @@ public class TestNativeMapReduce {
assertFalse(iter.hasNext());
- // We have to manually delete intermediate mapreduce files
- Util.deleteFile(cluster,"table_testNativeMRJobSimple_input");
- Util.deleteFile(cluster,"table_testNativeMRJobSimple_output");
-
// check in interactive mode
iter = pigServer.openIterator("B");
@@ -259,9 +255,6 @@ public class TestNativeMapReduce {
assertFalse(iter.hasNext());
- Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_input");
- Util.deleteFile(cluster,"table_testNativeMRJobMultiStoreOnPred_output");
-
// check in interactive mode
iter = pigServer.openIterator("B");