You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/26 23:20:32 UTC
svn commit: r699504 - in /incubator/pig/branches/types: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/
src/org/apache/pig/backend/hadoop/exe...
Author: olga
Date: Fri Sep 26 14:20:31 2008
New Revision: 699504
URL: http://svn.apache.org/viewvc?rev=699504&view=rev
Log:
PIG-443: illustrate
Added:
incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
Modified:
incubator/pig/branches/types/CHANGES.txt
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Fri Sep 26 14:20:31 2008
@@ -253,3 +253,5 @@
PIG-462: LIMIT N should create one output file with N rows (shravanmn via
olgan)
+
+ PIG-443: Illustrate for the Types branch (shubham via olgan)
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Sep 26 14:20:31 2008
@@ -41,6 +41,7 @@
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
import org.apache.pig.builtin.BinStorage;
import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -65,6 +66,7 @@
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.logicalLayer.LODefine;
import org.apache.pig.impl.logicalLayer.LOStore;
+import org.apache.pig.pen.ExampleGenerator;
/**
*
@@ -610,6 +612,21 @@
// pigContext.getExecutionEngine().reclaimScope(this.scope);
}
+ public Map<LogicalOperator, DataBag> getExamples(String alias) {
+ //LogicalPlan plan = aliases.get(aliasOp.get(alias));
+ LogicalPlan plan = null;
+ try {
+ plan = clonePlan(alias);
+ } catch (IOException e) {
+ //Since the original script is parsed anyway, there should not be an
+ //error in this parsing. The only reason there can be an error is when
+ //the files being loaded in load don't exist anymore.
+ e.printStackTrace();
+ }
+ ExampleGenerator exgen = new ExampleGenerator(plan, pigContext);
+ return exgen.getExamples();
+ }
+
private ExecJob execute(String alias) throws FrontendException, ExecException {
ExecJob job = null;
// lp.explain(System.out, System.err);
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Sep 26 14:20:31 2008
@@ -59,15 +59,15 @@
public class LogToPhyTranslationVisitor extends LOVisitor {
- Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
+ protected Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
Random r = new Random();
- Stack<PhysicalPlan> currentPlans;
+ protected Stack<PhysicalPlan> currentPlans;
- PhysicalPlan currentPlan;
+ protected PhysicalPlan currentPlan;
- NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
+ protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
private Log log = LogFactory.getLog(getClass());
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Sep 26 14:20:31 2008
@@ -34,6 +34,7 @@
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.LineageTracer;
/**
*
@@ -114,6 +115,8 @@
static protected DataBag dummyBag;
static protected Map dummyMap;
+
+ protected LineageTracer lineageTracer;
public PhysicalOperator(OperatorKey k) {
this(k, -1, null);
@@ -134,6 +137,10 @@
res = new Result();
}
+ public void setLineageTracer(LineageTracer lineage) {
+ this.lineageTracer = lineage;
+ }
+
public int getRequestedParallelism() {
return requestedParallelism;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Fri Sep 26 14:20:31 2008
@@ -22,6 +22,8 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.*;
import org.apache.pig.impl.plan.PlanVisitor;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
@@ -219,4 +221,15 @@
}
+ public void visitCogroup(POCogroup cogroup) {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void visitSplit(org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit split) {
+ // TODO Auto-generated method stub
+
+ }
+
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Fri Sep 26 14:20:31 2008
@@ -30,6 +30,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
/**
* This is an implementation of the Filter operator. It has an Expression Plan
@@ -150,6 +151,11 @@
return res;
if (res.result != null && (Boolean) res.result == true) {
+ if(lineageTracer != null) {
+ ExampleTuple tIn = (ExampleTuple) inp.result;
+ lineageTracer.insert(tIn);
+ lineageTracer.union(tIn, tIn);
+ }
return inp;
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Sep 26 14:20:31 2008
@@ -23,6 +23,7 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
public class POForEach extends PhysicalOperator {
@@ -49,6 +50,8 @@
//This is the template whcih contains tuples and is flattened out in CreateTuple() to generate the final output
Object[] data = null;
+ ExampleTuple tIn = null;
+
public POForEach(OperatorKey k) {
this(k,-1,null,null);
}
@@ -122,10 +125,17 @@
if(processingPlan){
while(true) {
res = processPlan();
- if(res.returnStatus==POStatus.STATUS_OK){
+ if(res.returnStatus==POStatus.STATUS_OK) {
+ if(lineageTracer != null && res.result != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+ tOut.synthetic = tIn.synthetic;
+ lineageTracer.insert(tOut);
+ lineageTracer.union(tOut, tIn);
+ res.result = tOut;
+ }
return res;
}
- if(res.returnStatus==POStatus.STATUS_EOP){
+ if(res.returnStatus==POStatus.STATUS_EOP) {
processingPlan = false;
break;
}
@@ -156,6 +166,16 @@
res = processPlan();
processingPlan = true;
+
+ if(lineageTracer != null && res.result != null) {
+ //we check for res.result since that can also be null in the case of flatten
+ tIn = (ExampleTuple) inp.result;
+ ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+ tOut.synthetic = tIn.synthetic;
+ lineageTracer.insert(tOut);
+ lineageTracer.union(tOut, tIn);
+ res.result = tOut;
+ }
return res;
}
@@ -323,6 +343,11 @@
} else
out.append(in);
}
+
+ if(lineageTracer != null) {
+ ExampleTuple tOut = new ExampleTuple();
+ tOut.reference(out);
+ }
return out;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Fri Sep 26 14:20:31 2008
@@ -36,6 +36,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
/**
* The load operator which is used in two ways:
@@ -135,6 +136,10 @@
}
else
res.returnStatus = POStatus.STATUS_OK;
+ if(lineageTracer != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
+ res.result = tOut;
+ }
} catch (IOException e) {
log.error("Received error from loader function: " + e);
return res;
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Sep 26 14:20:31 2008
@@ -272,6 +272,10 @@
}
if (it.hasNext()) {
res.result = it.next();
+ if(lineageTracer != null) {
+ lineageTracer.insert((Tuple) res.result);
+ lineageTracer.union((Tuple)res.result, (Tuple)res.result);
+ }
res.returnStatus = POStatus.STATUS_OK;
} else {
res.returnStatus = POStatus.STATUS_EOP;
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Fri Sep 26 14:20:31 2008
@@ -29,6 +29,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
/**
* The union operator that combines the two inputs into a single
@@ -157,6 +158,11 @@
res.returnStatus = POStatus.STATUS_OK;
detachInput();
nextReturnEOP = true ;
+ if(lineageTracer != null) {
+ ExampleTuple tOut = (ExampleTuple) res.result;
+ lineageTracer.insert(tOut);
+ lineageTracer.union(tOut, tOut);
+ }
return res;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Fri Sep 26 14:20:31 2008
@@ -17,6 +17,7 @@
*/
package org.apache.pig.impl.logicalLayer;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -55,4 +56,23 @@
lpp.print(out);
}
+
+// public String toString() {
+// if(mOps.size() == 0)
+// return "Empty Plan!";
+// else{
+// ByteArrayOutputStream baos = new ByteArrayOutputStream();
+// PrintStream ps = new PrintStream(baos);
+// try {
+// explain(baos, ps);
+// } catch (VisitorException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// } catch (IOException e) {
+// // TODO Auto-generated catch block
+// e.printStackTrace();
+// }
+// return baos.toString();
+// }
+// }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/DependencyOrderWalker.java Fri Sep 26 14:20:31 2008
@@ -73,7 +73,7 @@
return new DependencyOrderWalker<O, P>(plan);
}
- private void doAllPredecessors(O node,
+ protected void doAllPredecessors(O node,
Set<O> seen,
Collection<O> fifo) throws VisitorException {
if (!seen.contains(node)) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/IdentityHashSet.java Fri Sep 26 14:20:31 2008
@@ -19,6 +19,7 @@
package org.apache.pig.impl.util;
import java.util.*;
+import java.util.Map.Entry;
public class IdentityHashSet<E> implements Set<E> {
@@ -121,6 +122,23 @@
throw new UnsupportedOperationException("Unsupported operation on IdentityHashSet.");
}
-
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("{");
+
+ Iterator<Entry<E, Object>> i = map.entrySet().iterator();
+ boolean hasNext = i.hasNext();
+ while (hasNext) {
+ Entry<E, Object> e = i.next();
+ E key = e.getKey();
+ buf.append(key);
+ hasNext = i.hasNext();
+ if (hasNext)
+ buf.append(", ");
+ }
+
+ buf.append("}");
+ return buf.toString();
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Fri Sep 26 14:20:31 2008
@@ -273,6 +273,11 @@
System.out.println(t);
}
}
+
+ protected void processIllustrate(String alias) throws IOException
+ {
+ mPigServer.getExamples(alias);
+ }
protected void processKill(String jobid) throws IOException
{
Modified: incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=699504&r1=699503&r2=699504&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Fri Sep 26 14:20:31 2008
@@ -95,6 +95,8 @@
abstract protected void processPig(String cmd) throws IOException;
abstract protected void processRemove(String path) throws IOException;
+
+ abstract protected void processIllustrate(String alias) throws IOException;
static String unquote(String s)
{
@@ -137,6 +139,7 @@
TOKEN: {<REGISTER: "register">}
TOKEN: {<REMOVE: "rm">}
TOKEN: {<SET: "set">}
+TOKEN: {<ILLUSTRATE: "illustrate">}
// internal use commands
TOKEN: {<SCRIPT_DONE: "scriptDone">}
@@ -379,6 +382,10 @@
t1 = <IDENTIFIER>
{processDump(t1.image);}
|
+ <ILLUSTRATE>
+ t1 = <IDENTIFIER>
+ {processIllustrate(t1.image);}
+ |
<DESCRIBE>
t1 = <IDENTIFIER>
{processDescribe(t1.image);}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java?rev=699504&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestExampleGenerator.java Fri Sep 26 14:20:31 2008
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestExampleGenerator extends TestCase {
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+ PigContext pigContext = new PigContext(ExecType.MAPREDUCE, cluster
+ .getProperties());
+
+ Random rand = new Random();
+ int MAX = 100;
+ String A, B;
+
+ {
+ try {
+ pigContext.connect();
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ File fileA, fileB;
+
+ fileA = File.createTempFile("dataA", ".dat");
+ fileB = File.createTempFile("dataB", ".dat");
+
+ writeData(fileA);
+ writeData(fileB);
+
+ A = "'" + FileLocalizer.hadoopify(fileA.toString(), pigContext) + "'";
+ B = "'" + FileLocalizer.hadoopify(fileB.toString(), pigContext) + "'";
+ System.out.println("A : " + A + "\n" + "B : " + B);
+ System.out.println("Test data created.");
+
+ }
+
+ private void writeData(File dataFile) throws Exception {
+ // File dataFile = File.createTempFile(name, ".dat");
+ FileOutputStream dat = new FileOutputStream(dataFile);
+
+ Random rand = new Random();
+
+ for (int i = 0; i < MAX; i++)
+ dat.write((rand.nextInt(10) + "\t" + rand.nextInt(10) + "\n")
+ .getBytes());
+
+ dat.close();
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+
+ PigServer pigserver = new PigServer(pigContext);
+
+ String query = "A = load " + A
+ + " using PigStorage() as (x : int, y : int);\n";
+ pigserver.registerQuery(query);
+ query = "B = filter A by x > 10;";
+ pigserver.registerQuery(query);
+ Map<LogicalOperator, DataBag> derivedData = pigserver.getExamples("B");
+
+ assertTrue(derivedData != null);
+
+ }
+
+ @Test
+ public void testForeach() throws ExecException, IOException {
+ PigServer pigServer = new PigServer(pigContext);
+
+ pigServer.registerQuery("A = load " + A
+ + " using PigStorage() as (x : int, y : int);");
+ pigServer.registerQuery("B = foreach A generate x + y as sum;");
+
+ Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("B");
+
+ assertTrue(derivedData != null);
+ }
+
+ @Test
+ public void testJoin() throws IOException, ExecException {
+ PigServer pigServer = new PigServer(pigContext);
+ pigServer.registerQuery("A1 = load " + A + " as (x, y);");
+ pigServer.registerQuery("B1 = load " + B + " as (x, y);");
+
+ pigServer.registerQuery("E = join A1 by x, B1 by x;");
+
+ Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("E");
+
+ assertTrue(derivedData != null);
+ }
+
+ @Test
+ public void testCogroupMultipleCols() throws Exception {
+
+ PigServer pigServer = new PigServer(pigContext);
+ pigServer.registerQuery("A = load " + A + " as (x, y);");
+ pigServer.registerQuery("B = load " + B + " as (x, y);");
+ pigServer.registerQuery("C = cogroup A by (x, y), B by (x, y);");
+ Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C");
+
+ assertTrue(derivedData != null);
+ }
+
+ @Test
+ public void testCogroup() throws Exception {
+ PigServer pigServer = new PigServer(pigContext);
+ pigServer.registerQuery("A = load " + A + " as (x, y);");
+ pigServer.registerQuery("B = load " + B + " as (x, y);");
+ pigServer.registerQuery("C = cogroup A by x, B by x;");
+ Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C");
+
+ assertTrue(derivedData != null);
+ }
+
+ @Test
+ public void testGroup() throws Exception {
+ PigServer pigServer = new PigServer(pigContext);
+ pigServer.registerQuery("A = load " + A.toString() + " as (x, y);");
+ pigServer.registerQuery("B = group A by x;");
+ Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("B");
+
+ assertTrue(derivedData != null);
+
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ PigServer pigServer = new PigServer(pigContext);
+ pigServer.registerQuery("A = load " + A.toString() + " as (x, y);");
+ pigServer.registerQuery("B = load " + B.toString() + " as (x, y);");
+ pigServer.registerQuery("C = union A, B;");
+ Map<LogicalOperator, DataBag> derivedData = pigServer.getExamples("C");
+
+ assertTrue(derivedData != null);
+ }
+
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java?rev=699504&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalPOSplit.java Fri Sep 26 14:20:31 2008
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LODefine;
+import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.FunctionalLogicalOptimizer;
+import org.junit.Test;
+
+public class TestLocalPOSplit extends TestCase {
+
+ Random r = new Random();
+
+ Log log = LogFactory.getLog(getClass());
+
+ PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+
+ @Test
+ public void testSplit() throws IOException, VisitorException, ExecException {
+ pigContext.connect();
+ File datFile = File.createTempFile("tempA", ".dat");
+
+ FileOutputStream dat = new FileOutputStream(datFile);
+
+ for (int i = 0; i < 100; i++) {
+ String str = r.nextInt(10) + "\n";
+ dat.write(str.getBytes());
+
+ }
+
+ dat.close();
+
+ String query = "split (load '" + datFile.getAbsolutePath()
+ + "') into a if $0 == 2, b if $0 == 9, c if $0 == 7 ;";
+
+ LogicalPlan plan = buildPlan(query);
+ PhysicalPlan pp = buildPhysicalPlan(plan);
+
+ DataBag[] bag = new DataBag[pp.getLeaves().size()];
+
+ for (int i = 0; i < bag.length; i++) {
+ bag[i] = BagFactory.getInstance().newDefaultBag();
+ }
+
+ for (int i = 0; i < pp.getLeaves().size(); i++) {
+ Tuple t = null;
+ for (Result res = pp.getLeaves().get(i).getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = pp
+ .getLeaves().get(i).getNext(t)) {
+ if (res.returnStatus == POStatus.STATUS_OK)
+ bag[i].add((Tuple) res.result);
+ }
+ }
+
+ // Depending on how the "maps" in the physical plan are
+ // built the leaves could be in different order between different runs.
+ // lets test the first tuple out of each leaf to
+ // 1) ensure the value was not seen before
+ // 2) all the remaining tuples from that leaf are same
+ // as the first value
+ Map<DataByteArray, Boolean> seen = new HashMap<DataByteArray, Boolean>();
+ seen.put(new DataByteArray("7".getBytes()), false);
+ seen.put(new DataByteArray("9".getBytes()), false);
+ seen.put(new DataByteArray("2".getBytes()), false);
+
+ for (int i = 0; i < bag.length; i++) {
+ DataByteArray firstValue = null;
+ Iterator<Tuple> it = bag[i].iterator();
+ if (it.hasNext()) {
+ // check that we have not seen this value before
+ Tuple t = it.next();
+ System.out.println(t);
+ firstValue = (DataByteArray) t.get(0);
+ assertFalse((Boolean) seen.get(firstValue));
+ seen.put(firstValue, true);
+
+ }
+ // check that all remaining tuples from this
+ // leaf have the same values as the first value
+ for (; it.hasNext();) {
+ Tuple t = it.next();
+ System.out.println(t);
+ assertEquals(t.get(0), firstValue);
+ }
+ }
+ }
+
+ public PhysicalPlan buildPhysicalPlan(LogicalPlan lp)
+ throws VisitorException {
+ LocalLogToPhyTranslationVisitor visitor = new LocalLogToPhyTranslationVisitor(
+ lp);
+ visitor.setPigContext(pigContext);
+ visitor.visit();
+ return visitor.getPhysicalPlan();
+ }
+
+ public LogicalPlan buildPlan(String query) {
+ return buildPlan(query, LogicalPlanBuilder.class.getClassLoader());
+ }
+
+ public LogicalPlan buildPlan(String query, ClassLoader cldr) {
+ LogicalPlanBuilder.classloader = cldr;
+
+ LogicalPlanBuilder builder = new LogicalPlanBuilder(pigContext); //
+
+ try {
+ LogicalPlan lp = builder.parse("Test-Plan-Builder", query, aliases,
+ logicalOpTable, aliasOp);
+ List<LogicalOperator> roots = lp.getRoots();
+
+ if (roots.size() > 0) {
+ for (LogicalOperator op : roots) {
+ if (!(op instanceof LOLoad) && !(op instanceof LODefine)) {
+ throw new Exception(
+ "Cannot have a root that is not the load or define operator. Found "
+ + op.getClass().getName());
+ }
+ }
+ }
+
+ System.err.println("Query: " + query);
+
+ // Just the top level roots and their children
+ // Need a recursive one to travel down the tree
+
+ for (LogicalOperator op : lp.getRoots()) {
+ System.err.println("Logical Plan Root: "
+ + op.getClass().getName() + " object " + op);
+
+ List<LogicalOperator> listOp = lp.getSuccessors(op);
+
+ if (null != listOp) {
+ Iterator<LogicalOperator> iter = listOp.iterator();
+ while (iter.hasNext()) {
+ LogicalOperator lop = iter.next();
+ System.err.println("Successor: "
+ + lop.getClass().getName() + " object " + lop);
+ }
+ }
+ }
+ lp = refineLogicalPlan(lp);
+ assertTrue(lp != null);
+ return lp;
+ } catch (IOException e) {
+ // log.error(e);
+ // System.err.println("IOException Stack trace for query: " +
+ // query);
+ // e.printStackTrace();
+ fail("IOException: " + e.getMessage());
+ } catch (Exception e) {
+ log.error(e);
+ // System.err.println("Exception Stack trace for query: " + query);
+ // e.printStackTrace();
+ fail(e.getClass().getName() + ": " + e.getMessage() + " -- "
+ + query);
+ }
+ return null;
+ }
+
+ private LogicalPlan refineLogicalPlan(LogicalPlan plan) {
+ PlanSetter ps = new PlanSetter(plan);
+ try {
+ ps.visit();
+
+ } catch (VisitorException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // run through validator
+ CompilationMessageCollector collector = new CompilationMessageCollector();
+ FrontendException caught = null;
+ try {
+ LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor(
+ plan, pigContext);
+ validator.validate(plan, collector);
+
+ FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer(
+ plan);
+ optimizer.optimize();
+ } catch (FrontendException fe) {
+ // Need to go through and see what the collector has in it. But
+ // remember what we've caught so we can wrap it into what we
+ // throw.
+ caught = fe;
+ }
+
+ return plan;
+
+ }
+
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
+ Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
+ Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java?rev=699504&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOCogroup.java Fri Sep 26 14:20:31 2008
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+
+public class TestPOCogroup extends TestCase {
+ Random r = new Random();
+
+ public void testCogroup2Inputs() throws Exception {
+ DataBag bag1 = BagFactory.getInstance().newDefaultBag();
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(new Integer(2));
+ bag1.add(t);
+ t = TupleFactory.getInstance().newTuple();
+ t.append(new Integer(1));
+ bag1.add(t);
+ t = TupleFactory.getInstance().newTuple();
+ t.append(new Integer(1));
+ bag1.add(t);
+
+ DataBag bag2 = BagFactory.getInstance().newDefaultBag();
+ t = TupleFactory.getInstance().newTuple();
+ t.append(new Integer(2));
+ bag2.add(t);
+ t = TupleFactory.getInstance().newTuple();
+ t.append(new Integer(2));
+ bag2.add(t);
+ t = TupleFactory.getInstance().newTuple();
+ t.append(new Integer(1));
+ bag2.add(t);
+
+ PORead poread1 = new PORead(new OperatorKey("", r.nextLong()), bag1);
+ PORead poread2 = new PORead(new OperatorKey("", r.nextLong()), bag2);
+
+ List<PhysicalOperator> inputs1 = new LinkedList<PhysicalOperator>();
+ inputs1.add(poread1);
+
+ POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ prj1.setResultType(DataType.INTEGER);
+ PhysicalPlan p1 = new PhysicalPlan();
+ p1.add(prj1);
+ List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
+ in1.add(p1);
+ POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+ .nextLong()), -1, inputs1);
+ lr1.setPlans(in1);
+ lr1.setIndex(0);
+
+ List<PhysicalOperator> inputs2 = new LinkedList<PhysicalOperator>();
+ inputs2.add(poread2);
+
+ POProject prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ prj2.setResultType(DataType.INTEGER);
+ PhysicalPlan p2 = new PhysicalPlan();
+ p2.add(prj2);
+ List<PhysicalPlan> in2 = new LinkedList<PhysicalPlan>();
+ in2.add(p2);
+ POLocalRearrange lr2 = new POLocalRearrange(new OperatorKey("", r
+ .nextLong()), -1, inputs2);
+ lr2.setPlans(in2);
+ lr2.setIndex(1);
+
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(lr1);
+ inputs.add(lr2);
+
+ POCogroup poc = new POCogroup(new OperatorKey("", r.nextLong()), -1,
+ inputs);
+
+ List<Tuple> expected = new LinkedList<Tuple>();
+
+ Tuple t1 = TupleFactory.getInstance().newTuple();
+ t1.append(1);
+ DataBag b1 = BagFactory.getInstance().newDefaultBag();
+ Tuple temp = TupleFactory.getInstance().newTuple();
+ temp.append(1);
+ b1.add(temp);
+ b1.add(temp);
+ t1.append(b1);
+
+ DataBag b2 = BagFactory.getInstance().newDefaultBag();
+ b2.add(temp);
+ t1.append(b2);
+
+ expected.add(t1);
+
+ t1 = TupleFactory.getInstance().newTuple();
+ t1.append(2);
+ DataBag b3 = BagFactory.getInstance().newDefaultBag();
+ temp = TupleFactory.getInstance().newTuple();
+ temp.append(2);
+ b3.add(temp);
+ t1.append(b3);
+
+ DataBag b4 = BagFactory.getInstance().newDefaultBag();
+ b4.add(temp);
+ b4.add(temp);
+ t1.append(b4);
+
+ expected.add(t1);
+ // System.out.println(expected.get(0) + " " + expected.get(1));
+ List<Tuple> obtained = new LinkedList<Tuple>();
+ for (Result res = poc.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = poc
+ .getNext(t)) {
+ System.out.println(res.result);
+ obtained.add((Tuple) res.result);
+ assertTrue(expected.contains((Tuple) res.result));
+ }
+ assertEquals(expected.size(), obtained.size());
+ }
+
+ public void testCogroup1Input() throws ExecException {
+ DataBag input = BagFactory.getInstance().newDefaultBag();
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(1);
+ t.append(2);
+ input.add(t);
+ input.add(t);
+ Tuple t2 = TupleFactory.getInstance().newTuple();
+ t2.append(2);
+ t2.append(2);
+ Tuple t3 = TupleFactory.getInstance().newTuple();
+ t3.append(3);
+ t3.append(4);
+ input.add(t2);
+ input.add(t3);
+
+ PORead poread1 = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs1 = new LinkedList<PhysicalOperator>();
+ inputs1.add(poread1);
+
+ POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ prj1.setResultType(DataType.INTEGER);
+ PhysicalPlan p1 = new PhysicalPlan();
+ p1.add(prj1);
+ List<PhysicalPlan> in1 = new LinkedList<PhysicalPlan>();
+ in1.add(p1);
+ POLocalRearrange lr1 = new POLocalRearrange(new OperatorKey("", r
+ .nextLong()), -1, inputs1);
+ lr1.setPlans(in1);
+ lr1.setIndex(0);
+
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(lr1);
+
+ List<Tuple> expected = new LinkedList<Tuple>();
+ Tuple tOut1 = TupleFactory.getInstance().newTuple();
+ tOut1.append(2);
+ DataBag t11 = BagFactory.getInstance().newDefaultBag();
+ t11.add(t);
+ t11.add(t);
+ t11.add(t2);
+ tOut1.append(t11);
+ expected.add(tOut1);
+ Tuple tOut2 = TupleFactory.getInstance().newTuple();
+ tOut2.append(4);
+ DataBag t22 = BagFactory.getInstance().newDefaultBag();
+ t22.add(t3);
+ tOut2.append(t22);
+ expected.add(tOut2);
+ POCogroup poc = new POCogroup(new OperatorKey("", r.nextLong()), -1,
+ inputs);
+ int count = 0;
+ for (Result res = poc.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = poc
+ .getNext(t)) {
+ System.out.println(res.result);
+ count++;
+ assertTrue(expected.contains(res.result));
+ }
+ assertEquals(expected.size(), count);
+ }
+
+}