You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/12/09 02:59:58 UTC
svn commit: r724576 - in /hadoop/pig/branches/types: ./
src/org/apache/pig/data/ src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/
src/org/apache/pig/impl/logicalLayer/schema/ test/org/apache/pig/test/
Author: olga
Date: Mon Dec 8 17:59:58 2008
New Revision: 724576
URL: http://svn.apache.org/viewvc?rev=724576&view=rev
Log:
PIG-449: Schemas for bags should contain tuples all the time
Added:
hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java
Modified:
hadoop/pig/branches/types/CHANGES.txt
hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
Modified: hadoop/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/CHANGES.txt?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/CHANGES.txt (original)
+++ hadoop/pig/branches/types/CHANGES.txt Mon Dec 8 17:59:58 2008
@@ -324,7 +324,10 @@
PIG-538: support for null constants (pradeepk via olgan)
- PIG-385: more null handling (pradeepl via olgan)
+ PIG-385: more null handling (pradeepk via olgan)
PIG-546: FilterFunc calls empty constructor when it should be calling
- parameterized constructor
+ parameterized constructor (sms via olgan)
+
+ PIG-449: Schemas for bags should contain tuples all the time (pradeepk via
+ olgan)
Modified: hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/data/DataType.java Mon Dec 8 17:59:58 2008
@@ -795,6 +795,10 @@
}
Schema.FieldSchema tupleFs = new Schema.FieldSchema(null, schema, TUPLE);
Schema bagSchema = new Schema(tupleFs);
+ // since this schema has tuple field schema which internally
+ // has a list of field schemas for the actual items in the bag
+ // an access to any field in the bag is a two level access
+ bagSchema.setTwoLevelAccessRequired(true);
return new Schema.FieldSchema(null, bagSchema, BAG);
}
default: {
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOForEach.java Mon Dec 8 17:59:58 2008
@@ -201,6 +201,30 @@
//i.e., flatten(A), flatten(A.x) and NOT
//flatten(B.(x,y,z))
Schema s = planFs.schema;
+ if(null != s && s.isTwoLevelAccessRequired()) {
+ // this is the case where the schema is that of
+ // a bag which has just one tuple fieldschema which
+ // in turn has a list of fieldschemas. The schema
+ // after flattening would consist of the fieldSchemas
+ // present in the tuple
+
+ // check that indeed we only have one field schema
+ // which is that of a tuple
+ if(s.getFields().size() != 1) {
+ throw new FrontendException("Expected a bag schema with a single " +
+ "element of type "+ DataType.findTypeName(DataType.TUPLE) +
+ " but got a bag schema with multiple elements.");
+ }
+ Schema.FieldSchema tupleFS = s.getField(0);
+ if(tupleFS.type != DataType.TUPLE) {
+ throw new FrontendException("Expected a bag schema with a single " +
+ "element of type "+ DataType.findTypeName(DataType.TUPLE) +
+ " but got an element of type " +
+ DataType.findTypeName(tupleFS.type));
+ }
+ s = tupleFS.schema;
+
+ }
if(null != s) {
for(int i = 0; i < s.size(); ++i) {
Schema.FieldSchema fs;
@@ -330,6 +354,8 @@
mSchema = null;
mIsSchemaComputed = false;
throw fee;
+ } catch (ParseException e) {
+ throw new FrontendException(e);
}
}
//check for duplicate column names and throw an error if there are duplicates
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOProject.java Mon Dec 8 17:59:58 2008
@@ -234,7 +234,36 @@
if(null != expOpFs) {
Schema s = expOpFs.schema;
if(null != s) {
- Schema.FieldSchema fs = s.getField(mProjection.get(0));
+ Schema.FieldSchema fs;
+ if(s.isTwoLevelAccessRequired()) {
+ // this is the case where the schema is that of
+ // a bag which has just one tuple fieldschema which
+ // in turn has a list of fieldschemas. So the field
+ // schema we are trying to construct would be of the
+ // item we are trying to project inside the tuple
+ // fieldschema - because currently when we say b.i where
+ // b is a bag, we are trying to access the item i
+ // present in the tuple in the bag.
+
+ // check that indeed we only have one field schema
+ // which is that of a tuple
+ if(s.getFields().size() != 1) {
+ throw new FrontendException("Expected a bag schema with a single " +
+ "element of type "+ DataType.findTypeName(DataType.TUPLE) +
+ " but got a bag schema with multiple elements.");
+ }
+ Schema.FieldSchema tupleFS = s.getField(0);
+ if(tupleFS.type != DataType.TUPLE) {
+ throw new FrontendException("Expected a bag schema with a single " +
+ "element of type "+ DataType.findTypeName(DataType.TUPLE) +
+ " but got an element of type " +
+ DataType.findTypeName(tupleFS.type));
+ }
+ fs = tupleFS.schema.getField(mProjection.get(0));
+ } else {
+ // normal single level access
+ fs = s.getField(mProjection.get(0));
+ }
mFieldSchema = new Schema.FieldSchema(fs);
mFieldSchema.setParent(fs.canonicalName, expressionOperator);
} else {
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Mon Dec 8 17:59:58 2008
@@ -2858,6 +2858,10 @@
( t1 = <IDENTIFIER> ) [LOOKAHEAD(2) ":" <BAG> | ":"] "{" (fs = SchemaTuple() | {} {fs = new Schema.FieldSchema(null, new Schema());}) "}"
{
s = new Schema(fs);
+ // since this schema has tuple field schema which internally
+ // has a list of field schemas for the actual items in the bag
+ // an access to any field in the bag is a two level access
+ s.setTwoLevelAccessRequired(true);
if (null != t1) {
log.debug("BAG alias " + t1.image);
fs = new Schema.FieldSchema(t1.image, s, DataType.BAG);
Modified: hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Mon Dec 8 17:59:58 2008
@@ -556,6 +556,28 @@
private Map<String, FieldSchema> mAliases;
private MultiMap<String, String> mFieldSchemas;
private static Log log = LogFactory.getLog(Schema.class);
+ // In bags which have a schema with a tuple which contains
+ // the fields present in it, if we access the second field (say)
+ // we are actually trying to access the second field in the
+ // tuple in the bag. This is currently true for two cases:
+ // 1) bag constants - the schema of bag constant has a tuple
+ // which internally has the actual elements
+ // 2) When bags are loaded from input data, if the user
+ // specifies a schema with the "bag" type, he has to specify
+ // the bag as containing a tuple with the actual elements in
+ // the schema declaration. However in both the cases above,
+ // the user can still say b.i where b is the bag and i is
+ // an element in the bag's tuple schema. So in these cases,
+ // the access should translate to a lookup for "i" in the
+ // tuple schema present in the bag. To indicate this, the
+ // flag below is used. It is false by default because,
+ // currently we use bag as the type for relations. However
+ // the schema of a relation does NOT have a tuple fieldschema
+ // with items in it. Instead, the schema directly has the
+ // field schema of the items. So for a relation "b", the
+ // above b.i access would be a direct single level access
+ // of i in b's schema. This is treated as the "default" case
+ private boolean twoLevelAccessRequired = false;
public Schema() {
mFields = new ArrayList<FieldSchema>();
@@ -604,6 +626,7 @@
public Schema(Schema s) {
if(null != s) {
+ twoLevelAccessRequired = s.twoLevelAccessRequired;
mFields = new ArrayList<FieldSchema>(s.size());
mAliases = new HashMap<String, FieldSchema>();
mFieldSchemas = new MultiMap<String, String>();
@@ -840,6 +863,7 @@
s.mFieldSchemas.put(newFs.canonicalName, mFieldSchemas.get(oldFsCanonicalName));
}
+ s.twoLevelAccessRequired = twoLevelAccessRequired;
return s;
}
@@ -962,23 +986,57 @@
* @return position of the FieldSchema.
*/
public int getPosition(String alias) throws FrontendException{
-
- FieldSchema fs = getField(alias);
-
- if (null == fs) {
- return -1;
- }
-
- log.debug("fs: " + fs);
- int index = -1;
- for(int i = 0; i < mFields.size(); ++i) {
- log.debug("mFields(" + i + "): " + mFields.get(i) + " alias: " + mFields.get(i).alias);
- if(fs == mFields.get(i)) {index = i;}
+ if(twoLevelAccessRequired) {
+ // this is the case where "this" schema is that of
+ // a bag which has just one tuple fieldschema which
+ // in turn has a list of fieldschemas. The alias supplied
+ // should be treated as an alias in the tuple's schema
+
+ // check that indeed we only have one field schema
+ // which is that of a tuple
+ if(mFields.size() != 1) {
+ throw new FrontendException("Expected a bag schema with a single " +
+ "element of type "+ DataType.findTypeName(DataType.TUPLE) +
+ " but got a bag schema with multiple elements.");
+ }
+ Schema.FieldSchema tupleFS = mFields.get(0);
+ if(tupleFS.type != DataType.TUPLE) {
+ throw new FrontendException("Expected a bag schema with a single " +
+ "element of type "+ DataType.findTypeName(DataType.TUPLE) +
+ " but got an element of type " +
+ DataType.findTypeName(tupleFS.type));
+ }
+
+ // check if the alias supplied is that of the tuple
+ // itself - then disallow it since we do not allow access
+ // to the tuple itself - we only allow access to the fields
+ // in the tuple
+ if(alias.equals(tupleFS.alias)) {
+ throw new FrontendException("Access to the tuple ("+ alias + ") of " +
+ "the bag is disallowed. Only access to the elements of " +
+ "the tuple in the bag is allowed.");
+ }
+
+ // all is good - get the position from the tuple's schema
+ return tupleFS.schema.getPosition(alias);
+ } else {
+ FieldSchema fs = getField(alias);
+
+ if (null == fs) {
+ return -1;
+ }
+
+ log.debug("fs: " + fs);
+ int index = -1;
+ for(int i = 0; i < mFields.size(); ++i) {
+ log.debug("mFields(" + i + "): " + mFields.get(i) + " alias: " + mFields.get(i).alias);
+ if(fs == mFields.get(i)) {index = i;}
+ }
+
+ log.debug("index: " + index);
+ return index;
+ //return mFields.indexOf(fs);
}
-
- log.debug("index: " + index);
- return index;
- //return mFields.indexOf(fs);
}
public void addAlias(String alias, FieldSchema fs) {
@@ -1435,7 +1493,9 @@
}
}
- return new Schema(outputList) ;
+ Schema s = new Schema(outputList) ;
+ s.setTwoLevelAccessRequired(other.twoLevelAccessRequired);
+ return s;
}
/**
@@ -1449,6 +1509,20 @@
FieldSchema.setFieldSchemaDefaultType(fs, t);
}
}
+
+ /**
+ * @return the twoLevelAccess
+ */
+ public boolean isTwoLevelAccessRequired() {
+ return twoLevelAccessRequired;
+ }
+
+ /**
+ * @param twoLevelAccess the twoLevelAccess to set
+ */
+ public void setTwoLevelAccessRequired(boolean twoLevelAccess) {
+ this.twoLevelAccessRequired = twoLevelAccess;
+ }
}
Added: hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java?rev=724576&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java (added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestDataBagAccess.java Mon Dec 8 17:59:58 2008
@@ -0,0 +1,271 @@
+/**
+ *
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.MultiMap;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ *
+ */
+public class TestDataBagAccess extends TestCase {
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+ private PigServer pigServer;
+
+ @Before
+ @Override
+ public void setUp() throws Exception{
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ }
+
+ @Test
+ public void testBagConstantAccess() throws IOException, ExecException {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"sampledata\tnot_used"});
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "';");
+ pigServer.registerQuery("b = foreach a generate {(16, 4.0e-2, 'hello')} as mybag:{t:(i: int, d: double, c: chararray)};");
+ pigServer.registerQuery("c = foreach b generate mybag.i, mybag.d, mybag.c;");
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ Tuple t = it.next();
+ Object[] results = new Object[] { new Integer(16), new Double(4.0e-2), "hello" };
+ Class[] resultClasses = new Class[] { Integer.class, Double.class, String.class };
+ assertEquals(results.length, t.size());
+ for (int i = 0; i < results.length; i++) {
+ DataBag bag = (DataBag)t.get(i);
+ assertEquals(results[i], bag.iterator().next().get(0));
+ assertEquals(resultClasses[i], bag.iterator().next().get(0).getClass());
+ }
+ }
+
+ @Test
+ public void testBagConstantAccessFailure() throws IOException, ExecException {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"sampledata\tnot_used"});
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "';");
+ pigServer.registerQuery("b = foreach a generate {(16, 4.0e-2, 'hello')} as mybag:{t:(i: int, d: double, c: chararray)};");
+ boolean exceptionOccured = false;
+ try {
+ pigServer.registerQuery("c = foreach b generate mybag.t;");
+ } catch(IOException e) {
+ exceptionOccured = true;
+ String msg = e.getMessage();
+ assertTrue(msg.contains("Only access to the elements of " +
+ "the tuple in the bag is allowed."));
+ }
+ assertTrue(exceptionOccured);
+ }
+
+ @Test
+ public void testBagConstantFlatten1() throws IOException, ExecException {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"sampledata\tnot_used"});
+ pigServer.registerQuery("A = load 'file:" + Util.encodeEscape(input.toString()) + "';");
+ pigServer.registerQuery("B = foreach A generate {(('p1-t1-e1', 'p1-t1-e2'),('p1-t2-e1', 'p1-t2-e2'))," +
+ "(('p2-t1-e1', 'p2-t1-e2'), ('p2-t2-e1', 'p2-t2-e2'))};");
+ pigServer.registerQuery("C = foreach B generate $0 as pairbag { pair: ( t1: (e1, e2), t2: (e1, e2) ) };");
+ pigServer.registerQuery("D = foreach C generate FLATTEN(pairbag);");
+ pigServer.registerQuery("E = foreach D generate t1.e2 as t1e2, t2.e1 as t2e1;");
+ Iterator<Tuple> it = pigServer.openIterator("E");
+ // We should get the following two tuples as the result:
+ // (p1-t1-e2,p1-t2-e1)
+ // (p2-t1-e2,p2-t2-e1)
+ Tuple t = it.next();
+ assertEquals("p1-t1-e2", (String)t.get(0));
+ assertEquals("p1-t2-e1", (String)t.get(1));
+ t = it.next();
+ assertEquals("p2-t1-e2", (String)t.get(0));
+ assertEquals("p2-t2-e1", (String)t.get(1));
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void testBagConstantFlatten2() throws IOException, ExecException {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"somestring\t10\t{(a,10),(b,20)}"});
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' " +
+ "as (str:chararray, intval:int, bg:bag{t:tuple(s:chararray, i:int)});");
+ pigServer.registerQuery("b = foreach a generate str, intval, flatten(bg);");
+ pigServer.registerQuery("c = foreach b generate str, intval, s, i;");
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ int i = 0;
+ Object[][] results = new Object[][] { {"somestring", new Integer(10), "a", new Integer(10)},
+ {"somestring", new Integer(10), "b", new Integer(20) }};
+ Class[] resultClasses = new Class[] { String.class, Integer.class, String.class, Integer.class };
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ for (int j = 0; j < resultClasses.length; j++) {
+ assertEquals(results[i][j], t.get(j));
+ assertEquals(resultClasses[j], t.get(j).getClass());
+ }
+ i++;
+ }
+ assertEquals(results.length, i);
+
+ pigServer.registerQuery("c = foreach b generate str, intval, bg::s, bg::i;");
+ it = pigServer.openIterator("c");
+ i = 0;
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ for (int j = 0; j < resultClasses.length; j++) {
+ assertEquals(results[i][j], t.get(j));
+ assertEquals(resultClasses[j], t.get(j).getClass());
+ }
+ i++;
+ }
+ assertEquals(results.length, i);
+ }
+
+ @Test
+ public void testBagStoreLoad() throws IOException, ExecException {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"a\tid1", "a\tid2", "a\tid3", "b\tid4", "b\tid5", "b\tid6"});
+ pigServer.registerQuery("a = load 'file:" + Util.encodeEscape(input.toString()) + "' " +
+ "as (s:chararray, id:chararray);");
+ pigServer.registerQuery("b = group a by s;");
+ Class[] loadStoreClasses = new Class[] { BinStorage.class, PigStorage.class };
+ for (int i = 0; i < loadStoreClasses.length; i++) {
+ String output = "/pig/out/TestDataBagAccess-testBagStoreLoad-" +
+ loadStoreClasses[i].getName() + ".txt";
+ pigServer.deleteFile(output);
+ pigServer.store("b", output, loadStoreClasses[i].getName());
+ pigServer.registerQuery("c = load '" + output + "' using " + loadStoreClasses[i].getName() + "() AS " +
+ "(gp: chararray, bg:bag { t: tuple (sReLoaded: chararray, idReLoaded: chararray)});;");
+ Iterator<Tuple> it = pigServer.openIterator("c");
+ MultiMap<Object, Object> results = new MultiMap<Object, Object>();
+ results.put("a", "id1");
+ results.put("a", "id2");
+ results.put("a", "id3");
+ results.put("b", "id4");
+ results.put("b", "id5");
+ results.put("b", "id6");
+ int j = 0;
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ Object groupKey = t.get(0);
+ DataBag groupBag = (DataBag)t.get(1);
+ Iterator<Tuple> bgIt = groupBag.iterator();
+ int k = 0;
+ while(bgIt.hasNext()) {
+ // a hash to make sure we don't see the
+ // same "ids" twice
+ HashMap<Object, Boolean> seen = new HashMap<Object, Boolean>();
+ Tuple bgt = bgIt.next();
+ // the first col is the group by key
+ assertTrue(bgt.get(0).equals(groupKey));
+ Collection<Object> values = results.get(groupKey);
+ // check that the second column is one
+ // of the "id" values associated with this
+ // group by key
+ assertTrue(values.contains(bgt.get(1)));
+ // check that we have not seen the same "id" value
+ // before
+ if(seen.containsKey(bgt.get(1)))
+ fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
+ ", duplicate value (" + bgt.get(1) + ")");
+ else
+ seen.put(bgt.get(1), true);
+ k++;
+ }
+ // check that we saw 3 tuples in each group bag
+ assertEquals(3, k);
+ j++;
+ }
+ // make sure we saw the right number of high
+ // level tuples
+ assertEquals(results.keySet().size(), j);
+
+ pigServer.registerQuery("d = foreach c generate gp, flatten(bg);");
+ // results should be
+ // a a id1
+ // a a id2
+ // a a id3
+ // b b id4
+ // b b id5
+ // b b id6
+ // However order is not guaranteed
+ List<Tuple> resultTuples = new ArrayList<Tuple>();
+ resultTuples.add(Util.createTuple(new String[] { "a", "a", "id1"}));
+ resultTuples.add(Util.createTuple(new String[] { "a", "a", "id2"}));
+ resultTuples.add(Util.createTuple(new String[] { "a", "a", "id3"}));
+ resultTuples.add(Util.createTuple(new String[] { "b", "b", "id4"}));
+ resultTuples.add(Util.createTuple(new String[] { "b", "b", "id5"}));
+ resultTuples.add(Util.createTuple(new String[] { "b", "b", "id6"}));
+ it = pigServer.openIterator("d");
+ j = 0;
+ HashMap<Tuple, Boolean> seen = new HashMap<Tuple, Boolean>();
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ assertTrue(resultTuples.contains(t));
+ if(seen.containsKey(t)) {
+ fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
+ ", duplicate tuple (" + t + ") encountered.");
+ } else {
+ seen.put(t, true);
+ }
+ j++;
+ }
+ // check we got expected number of tuples
+ assertEquals(resultTuples.size(), j);
+
+ // same output as above - but projection based on aliases
+ pigServer.registerQuery("e = foreach d generate gp, sReLoaded, idReLoaded;");
+ it = pigServer.openIterator("e");
+ j = 0;
+ seen = new HashMap<Tuple, Boolean>();
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ assertTrue(resultTuples.contains(t));
+ if(seen.containsKey(t)) {
+ fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
+ ", duplicate tuple (" + t + ") encountered.");
+ } else {
+ seen.put(t, true);
+ }
+ j++;
+ }
+ // check we got expected number of tuples
+ assertEquals(resultTuples.size(), j);
+
+ // same result as above but projection based on position specifiers
+ pigServer.registerQuery("f = foreach d generate $0, $1, $2;");
+ it = pigServer.openIterator("f");
+ j = 0;
+ seen = new HashMap<Tuple, Boolean>();
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ assertTrue(resultTuples.contains(t));
+ if(seen.containsKey(t)) {
+ fail("LoadStoreClass used : " + loadStoreClasses[i].getName() + " " +
+ ", duplicate tuple (" + t + ") encountered.");
+ } else {
+ seen.put(t, true);
+ }
+ j++;
+ }
+ // check we got expected number of tuples
+ assertEquals(resultTuples.size(), j);
+
+
+ }
+ }
+}
Modified: hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java?rev=724576&r1=724575&r2=724576&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestPigServer.java Mon Dec 8 17:59:58 2008
@@ -38,6 +38,7 @@
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.junit.Before;
import org.junit.After;
@@ -551,7 +552,17 @@
InputStream fileWithStdOutContents = new DataInputStream( new BufferedInputStream( new FileInputStream(stdOutRedirectedFile)));
BufferedReader reader = new BufferedReader(new InputStreamReader(fileWithStdOutContents));
while ((s = reader.readLine()) != null) {
- assertTrue(s.equals("b: {site: chararray,count: int,itemCounts::itemCountsTuple: (type: chararray,typeCount: int,f: float,m: map[ ])}") == true);
+ // strip away the initial schema alias and the
+ // curlies surrounding the schema to construct
+ // the schema object from the schema string
+ s = s.replaceAll("^.*\\{", "");
+ s = s.replaceAll("\\}$", "");
+ Schema actual = Util.getSchemaFromString( s);
+ Schema expected = Util.getSchemaFromString(
+ "site: chararray,count: int," +
+ "itemCounts::type: chararray,itemCounts::typeCount: int," +
+ "itemCounts::f: float,itemCounts::m: map[ ]");
+ assertEquals(expected, actual);
}
}