You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/06/15 02:49:01 UTC
svn commit: r954681 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/tools/grunt/
src/org/apache/pig/tools/pigscript/parser/ test/org/apache/pig/test/
Author: daijy
Date: Tue Jun 15 00:49:00 2010
New Revision: 954681
URL: http://svn.apache.org/viewvc?rev=954681&view=rev
Log:
PIG-972: Make describe work with nested foreach
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/PigServer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 15 00:49:00 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-972: Make describe work with nested foreach (aniket486 via daijy)
+
PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs
(rding)
Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue Jun 15 00:49:00 2010
@@ -72,6 +72,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LOSort;
@@ -593,6 +594,27 @@ public class PigServer {
throw new FrontendException (msg, errCode, PigException.INPUT, false, null, fee);
}
}
+
+ /**
+ * Write the schema for a nestedAlias to System.out. Denoted by alias::nestedAlias.
+ * @param alias Alias whose schema has nestedAlias
+ * @param nestedAlias Alias whose schema will be written out
+ * @return Schema of alias dumped
+ * @throws IOException
+ */
+ public Schema dumpSchemaNested(String alias, String nestedAlias) throws IOException{
+ LogicalPlan lp = getPlanFromAlias(alias, "describe");
+ lp = compileLp(alias, false);
+ LogicalOperator op = lp.getLeaves().get(0);
+ if(op instanceof LOForEach) {
+ return ((LOForEach)op).dumpNestedSchema(nestedAlias);
+ }
+ else {
+ int errCode = 1001;
+ String msg = "Unable to describe schema for " + alias + "::" + nestedAlias;
+ throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
+ }
+ }
/**
* Set the name of the job. This name will get translated to mapred.job.name.
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue Jun 15 00:49:00 2010
@@ -17,18 +17,23 @@
*/
package org.apache.pig.impl.logicalLayer;
+import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.Set;
-import java.util.Iterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
-import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -37,9 +42,6 @@ import org.apache.pig.impl.plan.Required
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
public class LOForEach extends RelationalOperator {
@@ -434,7 +436,71 @@ public class LOForEach extends Relationa
super.unsetSchema();
mSchemaPlanMapping = new ArrayList<LogicalPlan>();
}
-
+
+ private void doAllSuccessors(LogicalPlan lp,
+ LogicalOperator node,
+ Set<LogicalOperator> seen,
+ Collection<LogicalOperator> fifo) throws VisitorException {
+ if (!seen.contains(node)) {
+ // We haven't seen this one before.
+ Collection<LogicalOperator> succs = lp.getSuccessors(node);
+ if (succs != null && succs.size() > 0) {
+ // Do all our predecessors before ourself
+ for (LogicalOperator op : succs) {
+ doAllSuccessors(lp, op, seen, fifo);
+ }
+ }
+ // Now do ourself
+ seen.add(node);
+ fifo.add(node);
+ }
+ }
+
+ public Schema dumpNestedSchema(String nestedAlias) throws IOException {
+ boolean found = false;
+ // To avoid non-deterministic traversal,
+ // we do a traversal from leaf to root with ReverseDependencyOrderWalker
+ // this way schema we print is always the latest schema in the order in the script
+ // Also, since we do not allow union, join, cogroup, cross etc as part of inner plan
+ // we have a tree (not a DAG) as part of inner plan and hence traversal is simpler
+
+ for(LogicalPlan lp : mForEachPlans) {
+ // Following walk is highly inefficient as we create a fifo list of all elements
+ // we need to traverse and then check for the suitable element
+ // but should be fine as our innerplans are expected to be small
+ // Also, although we are sure that innerplan is a tree instead of DAG
+ // We keep the algorithm assuming it is DAG, to avoid bugs later
+ // This is borrowed logic from ReverseDependencyOrderWalker ;)
+ List<LogicalOperator> fifo = new ArrayList<LogicalOperator>();
+ Set<LogicalOperator> seen = new HashSet<LogicalOperator>();
+ for(LogicalOperator op : lp.getRoots()) {
+ doAllSuccessors(lp, op, seen, fifo);
+ }
+ for(LogicalOperator op: fifo) {
+ if(!(op instanceof LOProject) && nestedAlias.equalsIgnoreCase(op.mAlias)) {
+ found = true;
+ // Expression operators do not have any schema
+ if(!(op instanceof ExpressionOperator)) {
+ Schema nestedSc = op.getSchema();
+ System.out.println(nestedAlias + ": " + nestedSc.toString());
+ return nestedSc;
+ }
+ else {
+ int errCode = 1113;
+ String msg = "Unable to describe schema for nested expression "+ nestedAlias;
+ throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
+ }
+ }
+ }
+ if(!found) {
+ int errCode = 1114;
+ String msg = "Unable to find schema for nested alias "+ nestedAlias;
+ throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
+ }
+ }
+ return null;
+ }
+
/**
* @see org.apache.pig.impl.plan.Operator#clone()
* Do not use the clone method directly. Operators are cloned when logical plans
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Tue Jun 15 00:49:00 2010
@@ -227,11 +227,19 @@ public class GruntParser extends PigScri
@Override
protected void processDescribe(String alias) throws IOException {
+ String nestedAlias = null;
if(mExplain == null) { // process only if not in "explain" mode
if(alias==null) {
alias = mPigServer.getPigContext().getLastAlias();
}
- mPigServer.dumpSchema(alias);
+ if(alias.contains("::")) {
+ nestedAlias = alias.substring(alias.indexOf("::") + 2);
+ alias = alias.substring(0, alias.indexOf("::"));
+ mPigServer.dumpSchemaNested(alias, nestedAlias);
+ }
+ else {
+ mPigServer.dumpSchema(alias);
+ }
} else {
log.warn("'describe' statement is ignored while processing 'explain -script' or '-check'");
}
Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Tue Jun 15 00:49:00 2010
@@ -355,7 +355,7 @@ TOKEN:
| <#NUMBER: <INTEGER> | <FLOAT> | <FLOAT> ( ["e","E"] ([ "-","+"])? <FLOAT> )?>
}
-TOKEN: {<IDENTIFIER: (<LETTER>)+(<DIGIT> | <LETTER> | <SPECIALCHAR>)*>}
+TOKEN: {<IDENTIFIER: (<LETTER>)+(<DIGIT> | <LETTER> | <SPECIALCHAR> | "::")*>}
TOKEN: {<PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+>}
TOKEN : { <QUOTEDSTRING : "'"
( (~["'","\\","\n","\r"])
@@ -454,8 +454,8 @@ void parse() throws IOException:
{processIllustrate(t1.image);}
|
<DESCRIBE>
- (
t1 = <IDENTIFIER>
+ (
{processDescribe(t1.image);}
|
{processDescribe(null);}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Tue Jun 15 00:49:00 2010
@@ -41,6 +41,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.test.utils.Identity;
import org.junit.AfterClass;
import org.junit.Before;
@@ -644,4 +645,31 @@ public class TestEvalPipeline2 extends T
Util.deleteFile(cluster, "table_testCustomPartitionerCross");
}
+
+ // See PIG-972
+ @Test
+ public void testDescribeNestedAlias() throws Exception{
+ String[] input = {
+ "1\t3",
+ "2\t4",
+ "3\t5"
+ };
+
+ Util.createInputFile(cluster, "table_testDescribeNestedAlias", input);
+ pigServer.registerQuery("A = LOAD 'table_testDescribeNestedAlias' as (a0, a1);");
+ pigServer.registerQuery("P = GROUP A by a1;");
+ // Test RelationalOperator
+ pigServer.registerQuery("B = FOREACH P { D = ORDER A by $0; generate D.$0; };");
+
+ // Test ExpressionOperator - negative test case
+ pigServer.registerQuery("C = FOREACH A { D = a0/a1; E=a1/a0; generate E as newcol; };");
+ Schema schema = pigServer.dumpSchemaNested("B", "D");
+ assertTrue(schema.toString().equalsIgnoreCase("{a0: bytearray,a1: bytearray}"));
+ try {
+ schema = pigServer.dumpSchemaNested("C", "E");
+ } catch (FrontendException e) {
+ assertTrue(e.getErrorCode() == 1113);
+ }
+ }
+
}