You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/11/20 21:05:09 UTC
svn commit: r719348 - in /hadoop/pig/branches/types: ./
src/org/apache/pig/impl/logicalLayer/optimizer/ test/org/apache/pig/test/
test/org/apache/pig/test/utils/
Author: olga
Date: Thu Nov 20 12:05:09 2008
New Revision: 719348
URL: http://svn.apache.org/viewvc?rev=719348&view=rev
Log:
PIG-537: Failure in Hadoop map collect stage due to type mismatch in the keys used in cogroup
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Thu Nov 20 12:05:09 2008
@@ -318,3 +318,6 @@
PIG-528: use UDF return in schema computation (sms via olgan)
PIG-527: allow PigStorage to write out complex output (sms via olgan)
+
+ PIG-537: Failure in Hadoop map collect stage due to type mismatch in the
+ keys used in cogroup (pradeepk vi olgan)
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/ImplicitSplitInserter.java Thu Nov 20 12:05:09 2008
@@ -31,6 +31,7 @@
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
public class ImplicitSplitInserter extends LogicalTransformer {
@@ -69,23 +70,62 @@
List<LogicalOperator> succs =
new ArrayList<LogicalOperator>(mPlan.getSuccessors(nodes.get(0)));
int index = -1;
- boolean nodeConnectedToSplit = false;
+ // For two successors of nodes.get(0) here is a pictorial
+ // representation of the change required:
+ // BEFORE:
+ // Succ1 Succ2
+ // \ /
+ // nodes.get(0)
+
+ // SHOULD BECOME:
+
+ // AFTER:
+ // Succ1 Succ2
+ // | |
+ // SplitOutput SplitOutput
+ // \ /
+ // Split
+ // |
+ // nodes.get(0)
+
+ // Here is how this will be accomplished.
+ // First (the same) Split Operator will be "inserted between" nodes.get(0)
+ // and all its successors. The "insertBetween" API is used which makes sure
+ // the ordering of operators in the graph is preserved. So we get the following:
+ // Succ1 Succ2
+ // | |
+ // Split Split
+ // \ /
+ // nodes.get(0)
+
+ // Then all but the first connection between nodes.get(0) and the Split
+ // Operator are removed using "disconnect" - so we get the following:
+ // Succ1 Succ2
+ // \ /
+ // Split
+ // |
+ // nodes.get(0)
+
+ // Now a new SplitOutputOperator is "inserted between" the Split operator
+ // and the successors. So we get:
+ // Succ1 Succ2
+ // | |
+ // SplitOutput SplitOutput
+ // \ /
+ // Split
+ // |
+ // nodes.get(0)
+
+
+ for (LogicalOperator succ : succs) {
+ mPlan.insertBetween(nodes.get(0), splitOp, succ);
+ }
+
+ for(int i = 1; i < succs.size(); i++) {
+ mPlan.disconnect(nodes.get(0), splitOp);
+ }
+
for (LogicalOperator succ : succs) {
- if(!nodeConnectedToSplit) {
- mPlan.insertBetween(nodes.get(0), splitOp, succ);
- // nodes.get(0) should be connected to Split (only once) and
- // split -> splitoutput -> successor - this is for the first successor
- // for the next successor we just want to connect in the order
- // split -> splitoutput -> successor without involving nodes.get(0)
- // in the above call we have connected
- // nodes.get(0) to split (we will set the flag
- // to true later in this loop iteration). Hence in subsequent
- // iterations we will only disconnect nodes.get(0) from its
- // successor and connect the split-splitoutput chain
- // to the successor
- } else {
- mPlan.disconnect(nodes.get(0), succ);
- }
LogicalPlan condPlan = new LogicalPlan();
LOConst cnst = new LOConst(mPlan, new OperatorKey(scope,
idGen.getNextNodeId(scope)), new Boolean(true));
@@ -95,22 +135,11 @@
new OperatorKey(scope, idGen.getNextNodeId(scope)), ++index, condPlan);
splitOp.addOutput(splitOutput);
mPlan.add(splitOutput);
-
- if(!nodeConnectedToSplit) {
- // node.get(0) should be connected to Split (only once) and
- // split to splitoutput to successor - this is for the first successor
- // for the next successor we just want to connect in the order
- // split - splitoutput - successor.
- // the call below is in the first successor case
- mPlan.insertBetween(splitOp, splitOutput, succ);
- nodeConnectedToSplit = true;
- } else {
- mPlan.connect(splitOp, splitOutput);
- mPlan.connect(splitOutput, succ);
- }
+ mPlan.insertBetween(splitOp, splitOutput, succ);
// Patch up the contained plans of succ
fixUpContainedPlans(nodes.get(0), splitOutput, succ, null);
}
+
} catch (Exception e) {
throw new OptimizerException(e);
}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Thu Nov 20 12:05:09 2008
@@ -50,7 +50,7 @@
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
-import org.apache.pig.test.util.Identity;
+import org.apache.pig.test.utils.Identity;
import junit.framework.TestCase;
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestImplicitSplit.java Thu Nov 20 12:05:09 2008
@@ -1,9 +1,13 @@
package org.apache.pig.test;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printMessageCollector;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printTypeGraph;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
+import java.util.HashMap;
import java.util.Iterator;
import junit.framework.TestCase;
@@ -11,6 +15,12 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.test.utils.LogicalPlanTester;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -49,4 +59,73 @@
}
assertEquals(20, cnt);
}
+
+ @Test
+ public void testImplicitSplitInCoGroup() throws Exception {
+ // this query is similar to the one reported in JIRA - PIG-537
+ // Create input file
+ File inputA = Util.createInputFile("tmp", "",
+ new String[] {"a:1", "b:2", "b:20", "c:3", "c:30"});
+ File inputB = Util.createInputFile("tmp", "",
+ new String[] {"a:first", "b:second", "c:third"});
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(inputA.toString()) +
+ "' using PigStorage(':') as (name:chararray, marks:int);");
+ pigServer.registerQuery("b = load 'file:" + Util.encodeEscape(inputA.toString()) +
+ "' using PigStorage(':') as (name:chararray, rank:chararray);");
+ pigServer.registerQuery("c = cogroup a by name, b by name;");
+ pigServer.registerQuery("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+ pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+ pigServer.registerQuery("f = foreach e generate group, flatten(a), flatten(d);");
+ HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+ results.put(1, new Object[] { "a", 1, "a", 1 });
+ results.put(2, new Object[] { "b", 2, "b", 2 });
+ results.put(3, new Object[] { "c", 3, "c", 3 });
+ results.put(20, new Object[] { "b", 20, "b", 20 });
+ results.put(30, new Object[] { "c", 30, "c", 30 });
+
+ Iterator<Tuple> it = pigServer.openIterator("f");
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ System.err.println("Tuple:" + t);
+ Integer group = (Integer)t.get(0);
+ Object[] groupValues = results.get(group);
+ for(int i = 0; i < 4; i++) {
+ assertEquals(groupValues[i], t.get(i+1));
+ }
+ }
+ }
+
+ @Test
+ public void testImplicitSplitInCoGroup2() throws Exception {
+ // this query is similar to the one reported in JIRA - PIG-537
+ LogicalPlanTester planTester = new LogicalPlanTester();
+ planTester.buildPlan("a = load 'file1' using PigStorage(':') as (name:chararray, marks:int);");
+ planTester.buildPlan("b = load 'file2' using PigStorage(':') as (name:chararray, rank:chararray);");
+ planTester.buildPlan("c = cogroup a by name, b by name;");
+ planTester.buildPlan("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+ planTester.buildPlan("e = cogroup a by marks, d by newmarks;");
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, flatten(a), flatten(d);");
+
+ // Set the logical plan values correctly in all the operators
+ PlanSetter ps = new PlanSetter(plan);
+ ps.visit();
+
+ // run through validator
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+
+ // this will run ImplicitSplitInserter
+ TestLogicalOptimizer.optimizePlan(plan);
+
+ // get Schema of leaf and compare:
+ Schema expectedSchema = Util.getSchemaFromString("grp: int,A::username: chararray,A::marks: int,AB::group: chararray,AB::newmarks: int");
+ assertTrue(Schema.equals(expectedSchema, plan.getLeaves().get(0).getSchema(),false, true));
+ }
}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu Nov 20 12:05:09 2008
@@ -19,7 +19,6 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.ByteArrayInputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -51,10 +50,9 @@
import org.apache.pig.impl.logicalLayer.*;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
import org.apache.pig.impl.logicalLayer.parser.ParseException ;
import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.test.util.Identity;
+import org.apache.pig.test.utils.Identity;
public class TestLogicalPlanBuilder extends junit.framework.TestCase {
@@ -1180,70 +1178,70 @@
//the first element in group, i.e., name is renamed as myname
lp = buildPlan("c = foreach b generate flatten(group) as (myname), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, age: int, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: chararray, age: int, mycount: long")));
//the first and second elements in group, i.e., name and age are renamed as myname and myage
lp = buildPlan("c = foreach b generate flatten(group) as (myname, myage), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, myage: int, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: chararray, myage: int, mycount: long")));
//the schema of group is unchanged
lp = buildPlan("c = foreach b generate flatten(group) as (), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("group::name: chararray, group::age: int, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("group::name: chararray, group::age: int, mycount: long")));
//the first element in group, i.e., name is renamed as myname
lp = buildPlan("c = foreach b generate flatten(group) as myname, COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("myname: chararray, age: int, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("myname: chararray, age: int, mycount: long")));
//group is renamed as mygroup
lp = buildPlan("c = foreach b generate group as mygroup, COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
//group is renamed as mygroup and the first element is renamed as myname
lp = buildPlan("c = foreach b generate group as mygroup:(myname), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, age: int), mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(myname: chararray, age: int), mycount: long")));
//group is renamed as mygroup and the elements are renamed as myname and myage
lp = buildPlan("c = foreach b generate group as mygroup:(myname, myage), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(myname: chararray, myage: int), mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(myname: chararray, myage: int), mycount: long")));
//group is renamed to mygroup as the tuple schema is empty
lp = buildPlan("c = foreach b generate group as mygroup:(), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("mygroup:(name: chararray, age: int), mycount: long")));
//setting the schema of flattened bag that has no schema with the user defined schema
buildPlan("c = load 'another_file';");
buildPlan("d = cogroup a by $0, c by $0;");
lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as (x, y, z), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("x: bytearray, y: bytearray, z: bytearray, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, y: bytearray, z: bytearray, mycount: long")));
//setting the schema of flattened bag that has no schema with the user defined schema
buildPlan("c = load 'another_file';");
buildPlan("d = cogroup a by $0, c by $0;");
lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as (x: int, y: float, z), COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("x: int, y: float, z: bytearray, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, y: float, z: bytearray, mycount: long")));
//setting the schema of flattened bag that has no schema with the user defined schema
buildPlan("c = load 'another_file';");
buildPlan("d = cogroup a by $0, c by $0;");
lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as x, COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("x: bytearray, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: bytearray, mycount: long")));
//setting the schema of flattened bag that has no schema with the user defined schema
buildPlan("c = load 'another_file';");
buildPlan("d = cogroup a by $0, c by $0;");
lp = buildPlan("e = foreach d generate flatten(DIFF(a, c)) as x: int, COUNT(a) as mycount;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(foreach.getSchema().equals(getSchemaFromString("x: int, mycount: long")));
+ assertTrue(foreach.getSchema().equals(Util.getSchemaFromString("x: int, mycount: long")));
}
@@ -1357,13 +1355,13 @@
LogicalPlan lp = buildPlan("c = foreach b {d = order a by $1; generate flatten(d), MAX(a.age) as max_age;};");
LOForEach foreach = (LOForEach) lp.getLeaves().get(0);
LOCogroup cogroup = (LOCogroup) lp.getPredecessors(foreach).get(0);
- Schema.FieldSchema bagFs = new Schema.FieldSchema("a", getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), DataType.BAG);
+ Schema.FieldSchema bagFs = new Schema.FieldSchema("a", Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), DataType.BAG);
Schema.FieldSchema groupFs = new Schema.FieldSchema("group", DataType.BYTEARRAY);
Schema cogroupExpectedSchema = new Schema();
cogroupExpectedSchema.add(groupFs);
cogroupExpectedSchema.add(bagFs);
assertTrue(Schema.equals(cogroup.getSchema(), cogroupExpectedSchema, false, false));
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray, max_age: double"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray, max_age: double"), false, true));
}
@Test
@@ -1415,23 +1413,23 @@
lp = buildPlan("b = foreach a generate 1;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: int"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: int"), false, true));
lp = buildPlan("b = foreach a generate 1L;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: long"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: long"), false, true));
lp = buildPlan("b = foreach a generate 1.0;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: double"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: double"), false, true));
lp = buildPlan("b = foreach a generate 1.0f;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: float"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: float"), false, true));
lp = buildPlan("b = foreach a generate 'hello';");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("x: chararray"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("x: chararray"), false, true));
}
@Test
@@ -1443,31 +1441,31 @@
lp = buildPlan("b = foreach a generate (1);");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("t:(x: int)"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: int)"), false, true));
lp = buildPlan("b = foreach a generate (1L);");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("t:(x: long)"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: long)"), false, true));
lp = buildPlan("b = foreach a generate (1.0);");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("t:(x: double)"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: double)"), false, true));
lp = buildPlan("b = foreach a generate (1.0f);");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("t:(x: float)"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: float)"), false, true));
lp = buildPlan("b = foreach a generate ('hello');");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("t:(x: chararray)"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray)"), false, true));
lp = buildPlan("b = foreach a generate ('hello', 1, 1L, 1.0f, 1.0);");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, y: int, z: long, a: float, b: double)"), false, true));
lp = buildPlan("b = foreach a generate ('hello', {(1), (1.0)});");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("t:(x: chararray, ib:{it:(d: double)})"), false, true));
}
@@ -1480,39 +1478,39 @@
lp = buildPlan("b = foreach a generate {(1, 'hello'), (2, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: int, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1, 'hello'), (1L, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: long, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0f, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1, 'hello'), (1.0, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0f, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: float, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1L, 'hello'), (1.0, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1.0f, 'hello'), (1.0, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1.0, 'hello'), (1.0f, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:(x: double, y: chararray)}"), false, true));
lp = buildPlan("b = foreach a generate {(1.0, 'hello', 3.14), (1.0f, 'world')};");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("b:{t:()}"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("b:{t:()}"), false, true));
}
@@ -1575,7 +1573,7 @@
lp = buildPlan("b = foreach a generate *;");
foreach = (LOForEach) lp.getLeaves().get(0);
- assertTrue(Schema.equals(foreach.getSchema(), getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), false, true));
+ assertTrue(Schema.equals(foreach.getSchema(), Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray"), false, true));
}
@@ -1603,8 +1601,8 @@
lp = buildPlan("b = group a by *;");
cogroup = (LOCogroup) lp.getLeaves().get(0);
- Schema groupSchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
- Schema bagASchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema groupSchema = Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema bagASchema = Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE);
Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, DataType.BAG);
Schema expectedSchema = new Schema(groupFs);
@@ -1623,9 +1621,9 @@
lp = buildPlan("c = group a by *, b by *;");
cogroup = (LOCogroup) lp.getLeaves().get(0);
- Schema groupSchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
- Schema bagASchema = getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
- Schema bagBSchema = getSchemaFromString("first_name: bytearray, enrol_age: bytearray, high_school_gpa: bytearray");
+ Schema groupSchema = Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema bagASchema = Util.getSchemaFromString("name: bytearray, age: bytearray, gpa: bytearray");
+ Schema bagBSchema = Util.getSchemaFromString("first_name: bytearray, enrol_age: bytearray, high_school_gpa: bytearray");
Schema.FieldSchema groupFs = new Schema.FieldSchema("group", groupSchema, DataType.TUPLE);
Schema.FieldSchema bagAFs = new Schema.FieldSchema("a", bagASchema, DataType.BAG);
Schema.FieldSchema bagBFs = new Schema.FieldSchema("b", bagBSchema, DataType.BAG);
@@ -1794,18 +1792,6 @@
}
- private Schema getSchemaFromString(String schemaString) throws ParseException {
- return getSchemaFromString(schemaString, DataType.BYTEARRAY);
- }
-
- private Schema getSchemaFromString(String schemaString, byte defaultType) throws ParseException {
- ByteArrayInputStream stream = new ByteArrayInputStream(schemaString.getBytes()) ;
- QueryParser queryParser = new QueryParser(stream) ;
- Schema schema = queryParser.TupleSchema() ;
- Schema.setSchemaDefaultType(schema, defaultType);
- return schema;
- }
-
private void printPlan(LogicalPlan lp) {
LOPrinter graphPrinter = new LOPrinter(System.err, lp);
System.err.println("Printing the logical plan");
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/Util.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/Util.java Thu Nov 20 12:05:09 2008
@@ -17,6 +17,7 @@
*/
package org.apache.pig.test;
+import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
@@ -30,6 +31,9 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.parser.QueryParser;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
public class Util {
private static BagFactory mBagFactory = BagFactory.getInstance();
@@ -236,4 +240,16 @@
return "file:/"+encodeEscape(path);
return "file:"+path;
}
+
+ public static Schema getSchemaFromString(String schemaString) throws ParseException {
+ return Util.getSchemaFromString(schemaString, DataType.BYTEARRAY);
+ }
+
+ static Schema getSchemaFromString(String schemaString, byte defaultType) throws ParseException {
+ ByteArrayInputStream stream = new ByteArrayInputStream(schemaString.getBytes()) ;
+ QueryParser queryParser = new QueryParser(stream) ;
+ Schema schema = queryParser.TupleSchema() ;
+ Schema.setSchemaDefaultType(schema, defaultType);
+ return schema;
+ }
}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java?rev=719348&r1=719347&r2=719348&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/utils/Identity.java Thu Nov 20 12:05:09 2008
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.pig.test.util;
+package org.apache.pig.test.utils;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;