You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/03/04 00:48:44 UTC
svn commit: r749846 - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/data/ src/org/apache/pig/impl/logicalLayer/parser/
src/org/apache/pig/impl/logicalLayer/schema/ test/org/apache/pig/test/
Author: pradeepkth
Date: Tue Mar 3 23:48:43 2009
New Revision: 749846
URL: http://svn.apache.org/viewvc?rev=749846&view=rev
Log:
PIG-577: outer join query looses name information (sms via pradeepkth)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/Main.java
hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java
hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Mar 3 23:48:43 2009
@@ -442,3 +442,5 @@
PIG-691: BinStorage skips tuples when ^A is present in data (pradeepkth
via sms)
+
+ PIG-577: outer join query looses name information (sms via pradeepkth)
Modified: hadoop/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/Main.java?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/Main.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/Main.java Tue Mar 3 23:48:43 2009
@@ -75,7 +75,7 @@
boolean verbose = false;
boolean gruntCalled = false;
- String logFileName = validateLogFile(null, null);
+ String logFileName = null;
try {
BufferedReader pin = null;
@@ -236,6 +236,11 @@
configureLog4J(properties);
// create the context with the parameter
PigContext pigContext = new PigContext(execType, properties);
+
+ if(logFileName == null) {
+ logFileName = validateLogFile(null, null);
+ }
+
pigContext.getProperties().setProperty("pig.logfile", logFileName);
LogicalPlanBuilder.classloader = pigContext.createCl(null);
@@ -507,7 +512,7 @@
if(logFile.isDirectory()) {
if(logFile.canWrite()) {
try {
- logFileName += logFile.getCanonicalPath() + File.separator + defaultLogFileName;
+ logFileName = logFile.getCanonicalPath() + File.separator + defaultLogFileName;
} catch (IOException ioe) {
throw new AssertionError("Could not compute canonical path to the log file " + ioe.getMessage());
}
@@ -556,8 +561,8 @@
//revert to the current working directory
String currDir = System.getProperty("user.dir");
logFile = new File(currDir);
- if(logFile.canWrite()) {
- logFileName = currDir + File.separator + (logFileName == null? defaultLogFileName : logFileName);
+ logFileName = currDir + File.separator + (logFileName == null? defaultLogFileName : logFileName);
+ if(logFile.canWrite()) {
return logFileName;
}
throw new RuntimeException("Cannot write to log file: " + logFileName);
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataType.java?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataType.java Tue Mar 3 23:48:43 2009
@@ -873,6 +873,7 @@
if((null == currSchema) || (currSchema.size() != schemaSize)) {
Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, null, TUPLE);
Schema bagSchema = new Schema(tupleFs);
+ bagSchema.setTwoLevelAccessRequired(true);
return new Schema.FieldSchema(null, bagSchema, BAG);
}
schema = Schema.mergeSchema(schema, currSchema, false, false, false);
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Mar 3 23:48:43 2009
@@ -3065,6 +3065,10 @@
( <BAG> "{" (fs = TypeSchemaTuple() | {} {fs = new Schema.FieldSchema(null, new Schema());}) "}" )
{
s = new Schema(fs);
+ // since this schema has tuple field schema which internally
+ // has a list of field schemas for the actual items in the bag
+ // an access to any field in the bag is a two level access
+ s.setTwoLevelAccessRequired(true);
if (null != t1) {
log.debug("BAG alias " + t1.image);
fs = new Schema.FieldSchema(t1.image, s, DataType.BAG);
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Tue Mar 3 23:48:43 2009
@@ -362,7 +362,7 @@
// Don't do the comparison if both embedded schemas are
// null. That will cause Schema.equals to return false,
// even though we want to view that as true.
- if (!(fschema.schema == null && fother.schema == null)) {
+ if (!(fschema.schema == null && fother.schema == null)) {
// compare recursively using schema
if (!Schema.equals(fschema.schema, fother.schema, false, relaxAlias)) {
return false ;
@@ -1155,6 +1155,32 @@
if (other == null) {
return false ;
}
+
+ /*
+ * Need to check for bags with schemas and bags with tuples that in turn have schemas.
+ * Retrieve the tuple schema of the bag if twoLevelAccessRequired
+ * Assuming that only bags exhibit this behavior and twoLevelAccessRequired is used
+ * with the right intentions
+ */
+ if(schema.isTwoLevelAccessRequired() || other.isTwoLevelAccessRequired()) {
+ if(schema.isTwoLevelAccessRequired()) {
+ try {
+ schema = schema.getField(0).schema;
+ } catch (FrontendException fee) {
+ return false;
+ }
+ }
+
+ if(other.isTwoLevelAccessRequired()) {
+ try {
+ other = other.getField(0).schema;
+ } catch (FrontendException fee) {
+ return false;
+ }
+ }
+
+ return Schema.equals(schema, other, relaxInner, relaxAlias);
+ }
if (schema.size() != other.size()) return false;
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Mar 3 23:48:43 2009
@@ -1898,6 +1898,7 @@
DataType.TUPLE);
Schema bagSchema = new Schema(tupleFs);
+ bagSchema.setTwoLevelAccessRequired(true);
Schema.FieldSchema bagFs = new Schema.FieldSchema(
"bag_of_tokenTuples",bagSchema, DataType.BAG);
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestSchema.java Tue Mar 3 23:48:43 2009
@@ -77,9 +77,9 @@
Schema schema2 = new Schema(list2) ;
Assert.assertTrue(Schema.equals(schema1, schema2, false, false)) ;
-
+
innerList2.get(1).alias = "pi" ;
-
+
Assert.assertFalse(Schema.equals(schema1, schema2, false, false)) ;
Assert.assertTrue(Schema.equals(schema1, schema2, false, true)) ;
@@ -567,5 +567,63 @@
// Compare
Assert.assertTrue(Schema.equals(mergedSchema, expected, false, false)) ;
}
+
+ @Test
+ public void testSchemaEqualTwoLevelAccess() throws Exception {
+
+ List<FieldSchema> innerList1 = new ArrayList<FieldSchema>() ;
+ innerList1.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList1.add(new FieldSchema("11b", DataType.LONG)) ;
+
+ List<FieldSchema> innerList2 = new ArrayList<FieldSchema>() ;
+ innerList2.add(new FieldSchema("11a", DataType.INTEGER)) ;
+ innerList2.add(new FieldSchema("11b", DataType.LONG)) ;
+
+ Schema innerSchema1 = new Schema(innerList1) ;
+ Schema innerSchema2 = new Schema(innerList2) ;
+
+ List<FieldSchema> list1 = new ArrayList<FieldSchema>() ;
+ list1.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list1.add(new FieldSchema("1b", innerSchema1)) ;
+ list1.add(new FieldSchema("1c", DataType.INTEGER)) ;
+
+ List<FieldSchema> list2 = new ArrayList<FieldSchema>() ;
+ list2.add(new FieldSchema("1a", DataType.BYTEARRAY)) ;
+ list2.add(new FieldSchema("1b", innerSchema2)) ;
+ list2.add(new FieldSchema("1c", DataType.INTEGER)) ;
+
+ Schema schema1 = new Schema(list1) ;
+ Schema schema2 = new Schema(list2) ;
+
+ Schema.FieldSchema bagFs1 = new Schema.FieldSchema("b", schema1, DataType.BAG);
+ Schema bagSchema1 = new Schema(bagFs1);
+
+ Schema.FieldSchema tupleFs = new Schema.FieldSchema("t", schema2, DataType.TUPLE);
+ Schema bagSchema = new Schema(tupleFs);
+ bagSchema.setTwoLevelAccessRequired(true);
+ Schema.FieldSchema bagFs2 = new Schema.FieldSchema("b", bagSchema, DataType.BAG);
+ Schema bagSchema2 = new Schema(bagFs2);
+
+
+ Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, false, false)) ;
+
+ innerList2.get(1).alias = "pi" ;
+
+ Assert.assertFalse(Schema.equals(bagSchema1, bagSchema2, false, false)) ;
+ Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, false, true)) ;
+
+ innerList2.get(1).alias = "11b" ;
+ innerList2.get(1).type = DataType.BYTEARRAY ;
+
+ Assert.assertFalse(Schema.equals(bagSchema1, bagSchema2, false, false)) ;
+ Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, true, false)) ;
+
+ innerList2.get(1).type = DataType.LONG ;
+
+ Assert.assertTrue(Schema.equals(bagSchema1, bagSchema2, false, false)) ;
+
+ list2.get(0).type = DataType.CHARARRAY ;
+ Assert.assertFalse(Schema.equals(bagSchema1, bagSchema2, false, false)) ;
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=749846&r1=749845&r2=749846&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestTypeCheckingValidator.java Tue Mar 3 23:48:43 2009
@@ -5557,28 +5557,28 @@
assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("org.apache.pig.builtin.PigStorage"));
}
-
+
@Test
public void testBincond() throws Throwable {
planTester.buildPlan("a = load 'a' as (name: chararray, age: int, gpa: float);") ;
planTester.buildPlan("b = group a by name;") ;
LogicalPlan plan = planTester.buildPlan("c = foreach b generate (IsEmpty(a) ? " + TestBinCondFieldSchema.class.getName() + "(*): a) ;") ;
-
+
// validate
CompilationMessageCollector collector = new CompilationMessageCollector() ;
TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
typeValidator.validate(plan, collector) ;
-
+
printMessageCollector(collector) ;
printTypeGraph(plan) ;
planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
+
if (collector.hasError()) {
throw new AssertionError("Did not expect an error") ;
}
-
-
+
+
LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
Schema.FieldSchema charFs = new FieldSchema(null, DataType.CHARARRAY);
@@ -5598,7 +5598,35 @@
Schema expectedSchema = new Schema(bagFs);
assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, true));
+
+ }
+ @Test
+ public void testBinCondForOuterJoin() throws Throwable {
+ planTester.buildPlan("a = LOAD 'student_data' AS (name: chararray, age: int, gpa: float);");
+ planTester.buildPlan("b = LOAD 'voter_data' AS (name: chararray, age: int, registration: chararray, contributions: float);");
+ planTester.buildPlan("c = COGROUP a BY name, b BY name;");
+ LogicalPlan plan = planTester.buildPlan("d = FOREACH c GENERATE group, flatten((not IsEmpty(a) ? a : (bag{tuple(chararray, int, float)}){(null, null, null)})), flatten((not IsEmpty(b) ? b : (bag{tuple(chararray, int, chararray, float)}){(null,null,null, null)}));");
+
+ // validate
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+ planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
+
+ if (collector.hasError()) {
+ throw new AssertionError("Expect no error") ;
+ }
+
+
+ LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
+ String expectedSchemaString = "mygroup: chararray,A::name: chararray,A::age: int,A::gpa: float,B::name: chararray,B::age: int,B::registration: chararray,B::contributions: float";
+ Schema expectedSchema = Util.getSchemaFromString(expectedSchemaString);
+ assertTrue(Schema.equals(foreach.getSchema(), expectedSchema, false, true));
+
}
/*