You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/27 16:51:51 UTC
svn commit: r990165 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...
Author: thejas
Date: Fri Aug 27 14:51:50 2010
New Revision: 990165
URL: http://svn.apache.org/viewvc?rev=990165&view=rev
Log:
PIG-506 - patch PIG-506.3.patch with change suggested by Daniel in jira
changes to work with new logical plan and other fixes
Added:
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java
Modified:
hadoop/pig/trunk/build.xml
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Fri Aug 27 14:51:50 2010
@@ -626,7 +626,6 @@
<exclude name="**/TestOrderBy.java" />
<exclude name="**/TestOrderBy2.java" />
<exclude name="**/TestPi.java" />
- <exclude name="**/TestNativeMapReduce.java" />
<exclude name="**/nightly/**" />
<!-- <exclude name="**/pigunit/**" /> -->
<exclude name="**/${exclude.testcase}.java" if="exclude.testcase" />
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Fri Aug 27 14:51:50 2010
@@ -68,6 +68,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
@@ -149,9 +150,6 @@ public class MRCompiler extends PhyPlanV
//The output of compiling the inputs
MapReduceOper[] compiledInputs = null;
- //Mapping of which MapReduceOper a store belongs to.
- Map<POStore, MapReduceOper> storeToMapReduceMap;
-
//The split operators seen till now. If not
//maintained they will haunt you.
//During the traversal a split is the only
@@ -199,7 +197,6 @@ public class MRCompiler extends PhyPlanV
}
scope = roots.get(0).getOperatorKey().getScope();
messageCollector = new CompilationMessageCollector() ;
- storeToMapReduceMap = new HashMap<POStore, MapReduceOper>();
phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
}
@@ -261,9 +258,17 @@ public class MRCompiler extends PhyPlanV
}
}
+ // get all stores and nativeMR operators, sort them in order(operator id)
+ // and compile their plans
List<POStore> stores = PlanHelper.getStores(plan);
- for (POStore store: stores) {
- compile(store);
+ List<PONative> nativeMRs= PlanHelper.getNativeMRs(plan);
+ List<PhysicalOperator> ops = new ArrayList<PhysicalOperator>(stores.size() + nativeMRs.size());
+ ops.addAll(stores);
+ ops.addAll(nativeMRs);
+ Collections.sort(ops);
+
+ for (PhysicalOperator op : ops) {
+ compile(op);
}
// I'm quite certain this is not the best way to do this. The issue
@@ -313,7 +318,11 @@ public class MRCompiler extends PhyPlanV
//store them away so that we can use them for compiling
//op.
List<PhysicalOperator> predecessors = plan.getPredecessors(op);
- if (predecessors != null && predecessors.size() > 0) {
+ if(op instanceof PONative){
+ // the predecessor (store) has already been processed
+ // don't process it again
+ }
+ else if (predecessors != null && predecessors.size() > 0) {
// When processing an entire script (multiquery), we can
// get into a situation where a load has
// predecessors. This means that it depends on some store
@@ -331,9 +340,12 @@ public class MRCompiler extends PhyPlanV
}
PhysicalOperator p = predecessors.get(0);
- if (!(p instanceof POStore)) {
+ MapReduceOper oper = null;
+ if(p instanceof POStore || p instanceof PONative){
+ oper = phyToMROpMap.get(p);
+ }else{
int errCode = 2126;
- String msg = "Predecessor of load should be a store. Got "+p.getClass();
+ String msg = "Predecessor of load should be a store or mapreduce operator. Got "+p.getClass();
throw new PlanException(msg, errCode, PigException.BUG);
}
@@ -341,8 +353,6 @@ public class MRCompiler extends PhyPlanV
curMROp = getMROp();
curMROp.mapPlan.add(op);
MRPlan.add(curMROp);
-
- MapReduceOper oper = storeToMapReduceMap.get((POStore)p);
plan.disconnect(op, p);
MRPlan.connect(oper, curMROp);
@@ -760,39 +770,13 @@ public class MRCompiler extends PhyPlanV
@Override
public void visitNative(PONative op) throws VisitorException{
// We will explode the native operator here to add a new MROper for native Mapreduce job
- // We will also add respective Load and Store operators for this native job
try{
- POStore st = op.getInnerStore();
- st.setAlias(op.getAlias());
- st.setIsTmpStore(true);
- st.setParentPlan(mPlan);
- mPlan.add(st);
-
- PhysicalOperator pred = mPlan.getPredecessors(op).get(0);
- mPlan.disconnect(pred, op);
- mPlan.connect(pred, st);
-
- nonBlocking(st);
-
- MapReduceOper prevMROp = phyToMROpMap.get(pred);
- storeToMapReduceMap.put(st, prevMROp);
- phyToMROpMap.put(st, prevMROp);
- if (st.getSFile()!=null && st.getSFile().getFuncSpec()!=null)
- curMROp.UDFs.add(st.getSFile().getFuncSpec().toString());
-
// add a map reduce boundary
- MapReduceOper nativeMR = getNativeMROp(op.getNativeMRjar(), op.getParams());
- MRPlan.add(nativeMR);
- MRPlan.connect(curMROp, nativeMR);
-
- // add one more map reduce boundary
- POLoad ld = op.getInnerLoad();
- ld.setAlias(op.getAlias());
- curMROp = getMROp();
- curMROp.mapPlan.add(ld);
- MRPlan.add(curMROp);
- MRPlan.connect(nativeMR, curMROp);
-
+ MapReduceOper nativeMROper = getNativeMROp(op.getNativeMRjar(), op.getParams());
+ MRPlan.add(nativeMROper);
+ MRPlan.connect(curMROp, nativeMROper);
+ phyToMROpMap.put(op, nativeMROper);
+ curMROp = nativeMROper;
}catch(Exception e){
int errCode = 2034;
String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -803,7 +787,6 @@ public class MRCompiler extends PhyPlanV
@Override
public void visitStore(POStore op) throws VisitorException{
try{
- storeToMapReduceMap.put(op, curMROp);
nonBlocking(op);
phyToMROpMap.put(op, curMROp);
if (op.getSFile()!=null && op.getSFile().getFuncSpec()!=null)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Aug 27 14:51:50 2010
@@ -126,6 +126,7 @@ public class MapReduceLauncher extends L
PigStatsUtil.startCollection(pc, jobClient, jcc, mrp);
List<Job> failedJobs = new LinkedList<Job>();
+ List<NativeMapReduceOper> failedNativeMR = new LinkedList<NativeMapReduceOper>();
List<Job> completeFailedJobsInThisRun = new LinkedList<Job>();
List<Job> succJobs = new LinkedList<Job>();
JobControl jc;
@@ -136,7 +137,10 @@ public class MapReduceLauncher extends L
//create the exception handler for the job control thread
//and register the handler with the job control thread
JobControlThreadExceptionHandler jctExceptionHandler = new JobControlThreadExceptionHandler();
-
+
+ boolean stop_on_failure =
+ pc.getProperties().getProperty("stop.on.failure", "false").equals("true");
+
// jc is null only when mrp.size == 0
while(mrp.size() != 0) {
jc = jcc.compile(mrp, grpName);
@@ -152,14 +156,34 @@ public class MapReduceLauncher extends L
ScriptState.get().emitJobsSubmittedNotification(1);
natOp.runJob();
numMRJobsCompl++;
- double prog = ((double)numMRJobsCompl)/totalMRJobs;
- notifyProgress(prog, lastProg);
- lastProg = prog;
- mrp.remove(natOp);
- } catch (JobCreationException je) {
+ } catch (IOException e) {
+
mrp.trimBelow(natOp);
- mrp.remove(natOp);
+ failedNativeMR.add(natOp);
+
+ String msg = "Error running native mapreduce" +
+ " operator job :" + natOp.getJobId() + e.getMessage();
+
+ String stackTrace = getStackStraceStr(e);
+ LogUtils.writeLog(msg,
+ stackTrace,
+ pc.getProperties().getProperty("pig.logfile"),
+ log
+ );
+ log.info(msg);
+
+ if (stop_on_failure) {
+ int errCode = 6017;
+
+ throw new ExecException(msg, errCode,
+ PigException.REMOTE_ENVIRONMENT);
+ }
+
}
+ double prog = ((double)numMRJobsCompl)/totalMRJobs;
+ notifyProgress(prog, lastProg);
+ lastProg = prog;
+ mrp.remove(natOp);
}
}
continue;
@@ -250,8 +274,7 @@ public class MapReduceLauncher extends L
}
if (!jc.getFailedJobs().isEmpty() ) {
- if ("true".equalsIgnoreCase(pc.getProperties().getProperty(
- "stop.on.failure", "false"))) {
+ if (stop_on_failure){
int errCode = 6017;
StringBuilder msg = new StringBuilder();
@@ -299,6 +322,10 @@ public class MapReduceLauncher extends L
boolean failed = false;
+ if(failedNativeMR.size() > 0){
+ failed = true;
+ }
+
// Look to see if any jobs failed. If so, we need to report that.
if (failedJobs != null && failedJobs.size() > 0) {
@@ -382,6 +409,13 @@ public class MapReduceLauncher extends L
return PigStatsUtil.getPigStats(ret);
}
+ private String getStackStraceStr(Throwable e) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(baos);
+ e.printStackTrace(ps);
+ return baos.toString();
+ }
+
/**
* Log the progress and notify listeners if there is sufficient progress
* @param prog current progress
@@ -553,10 +587,7 @@ public class MapReduceLauncher extends L
class JobControlThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread thread, Throwable throwable) {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- PrintStream ps = new PrintStream(baos);
- throwable.printStackTrace(ps);
- jobControlExceptionStackTrace = baos.toString();
+ jobControlExceptionStackTrace = getStackStraceStr(throwable);
try {
jobControlException = getExceptionFromString(jobControlExceptionStackTrace);
} catch (Exception e) {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Fri Aug 27 14:51:50 2010
@@ -1669,17 +1669,7 @@ public class LogToPhyTranslationVisitor
poNative.setAlias(loNative.getAlias());
poNative.setNativeMRjar(loNative.getNativeMRJar());
poNative.setParams(loNative.getParams());
- poNative.setResultType(loNative.getLoad().getType());
-
- currentPlans.push(currentPlan);
- currentPlan = new PhysicalPlan();
- PlanWalker<LogicalOperator, LogicalPlan> childWalker = mCurrentWalker
- .spawnChildWalker(loNative.getInnerPlan());
- pushWalker(childWalker);
- mCurrentWalker.walk(this);
- popWalker();
- poNative.setPhysInnerPlan(currentPlan);
- currentPlan = currentPlans.pop();
+ poNative.setResultType(DataType.BAG);
logToPhyMap.put(loNative, poNative);
currentPlan.add(poNative);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java Fri Aug 27 14:51:50 2010
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Utils;
@@ -28,40 +27,13 @@ public class PONative extends PhysicalOp
private static final long serialVersionUID = 1L;
- PhysicalPlan physInnerPlan;
String nativeMRjar;
String[] params;
- POStore innerStore;
- POLoad innerLoad;
-
+
public PONative(OperatorKey k) {
super(k);
}
- public PhysicalPlan getPhysInnerPlan() {
- return physInnerPlan;
- }
-
- public void setPhysInnerPlan(PhysicalPlan physInnerPlan) {
- this.physInnerPlan = physInnerPlan;
- for(PhysicalOperator innerOp : physInnerPlan) {
- if(innerOp instanceof POStore) {
- innerStore = (POStore) innerOp;
- }
- if(innerOp instanceof POLoad) {
- innerLoad = (POLoad) innerOp;
- }
- }
- }
-
- public POStore getInnerStore() {
- return innerStore;
- }
-
- public POLoad getInnerLoad() {
- return innerLoad;
- }
-
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
v.visitNative(this);
@@ -70,7 +42,8 @@ public class PONative extends PhysicalOp
@Override
public String name() {
return getAliasString() + "Native" + "('hadoop jar "
- + nativeMRjar + " " + Utils.getStringFromArray(params) + "')" + " - " + mKey.toString();
+ + nativeMRjar + " " + Utils.getStringFromArray(params) + "')"
+ + " - " + mKey.toString();
}
public String getNativeMRjar() {
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Fri Aug 27 14:51:50 2010
@@ -50,7 +50,7 @@ public class PlanHelper {
* @return List of stores (could be empty)
*/
public static LinkedList<POStore> getStores(PhysicalPlan plan) throws VisitorException {
- LoadStoreFinder finder = new LoadStoreFinder(plan);
+ LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
finder.visit();
return finder.getStores();
@@ -62,11 +62,23 @@ public class PlanHelper {
* @return List of loads (could be empty)
*/
public static LinkedList<POLoad> getLoads(PhysicalPlan plan) throws VisitorException {
- LoadStoreFinder finder = new LoadStoreFinder(plan);
+ LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
finder.visit();
return finder.getLoads();
}
+
+ /**
+ * Get all the load operators in the plan in the right dependency order
+ * @param plan
+ * @return List of loads (could be empty)
+ */
+ public static LinkedList<PONative> getNativeMRs(PhysicalPlan plan) throws VisitorException {
+ LoadStoreNativeFinder finder = new LoadStoreNativeFinder(plan);
+
+ finder.visit();
+ return finder.getNativeMRs();
+ }
/**
* Creates a relative path that can be used to build a temporary
@@ -85,14 +97,16 @@ public class PlanHelper {
}
}
- private static class LoadStoreFinder extends PhyPlanVisitor {
+ private static class LoadStoreNativeFinder extends PhyPlanVisitor {
private LinkedList<POLoad> loads;
private LinkedList<POStore> stores;
+ private LinkedList<PONative> nativeMRs;
- LoadStoreFinder(PhysicalPlan plan) {
+ LoadStoreNativeFinder(PhysicalPlan plan) {
super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
stores = new LinkedList<POStore>();
loads = new LinkedList<POLoad>();
+ nativeMRs = new LinkedList<PONative>();
}
@Override
@@ -111,7 +125,13 @@ public class PlanHelper {
super.visitLoad(load);
loads.add(load);
}
-
+
+ @Override
+ public void visitNative(PONative nativeMR) throws VisitorException {
+ super.visitNative(nativeMR);
+ nativeMRs.add(nativeMR);
+ }
+
public LinkedList<POStore> getStores() {
return stores;
}
@@ -119,5 +139,9 @@ public class PlanHelper {
public LinkedList<POLoad> getLoads() {
return loads;
}
+
+ public LinkedList<PONative> getNativeMRs(){
+ return nativeMRs;
+ }
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LONative.java Fri Aug 27 14:51:50 2010
@@ -31,34 +31,16 @@ public class LONative extends Relational
*/
private static final long serialVersionUID = 1L;
- String nativeMRJar;
- LogicalPlan innerPlan;
- String[] params = null;
- private LOLoad load;
- private LOStore store;
+ private String nativeMRJar;
+ private String[] params = null;
- public LONative(LogicalPlan plan, OperatorKey k, LogicalPlan innerLP,
- LOStore loStore, LOLoad loLoad, String nativeJar, String[] parameters) {
+ public LONative(LogicalPlan plan, OperatorKey k,
+ String nativeJar, String[] parameters) {
super(plan, k);
- innerPlan = innerLP;
nativeMRJar = nativeJar;
params = parameters;
- load = loLoad;
- store = loStore;
}
-
- public LogicalPlan getInnerPlan() {
- return innerPlan;
- }
-
- public LOLoad getLoad() {
- return load;
- }
-
- public LOStore getStore() {
- return store;
- }
-
+
@Override
public List<RequiredFields> getRelevantInputs(int output, int column)
throws FrontendException {
@@ -74,18 +56,14 @@ public class LONative extends Relational
}
@Override
- public Schema getSchema() throws FrontendException {
- return load.getSchema();
- }
-
- @Override
public void visit(LOVisitor v) throws VisitorException {
v.visit(this);
}
@Override
public String name() {
- return getAliasString() + "Native " + mKey.scope + "-" + mKey.id +" Store: " + store.name() + " Run: hadoop jar " + nativeMRJar + " " + Utils.getStringFromArray(params) + " Load: " + load.name();
+ return getAliasString() + "Native " + mKey.scope + "-" + mKey.id
+ + " Run: hadoop jar " + nativeMRJar + " " + Utils.getStringFromArray(params) ;
}
@Override
@@ -93,4 +71,9 @@ public class LONative extends Relational
return false;
}
+ @Override
+ public Schema getSchema() throws FrontendException {
+ return null;
+ }
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Fri Aug 27 14:51:50 2010
@@ -108,17 +108,6 @@ public class LogicalOptimizer extends
mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
new PartitionFilterOptimizer(plan), "LoadPartitionFilterOptimizer"));
- // this one is ordered to be before other optimizations since later
- // optimizations may move the LOFilter that is looks for just after a
- // LOLoad
- rulePlan = new RulePlan();
- RuleOperator loNat = new RuleOperator(LONative.class,
- new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
- rulePlan.add(loNat);
- mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan,
- new PartitionFilterOptimizer(plan), "NativePartitionFilterOptimizer"));
-
-
// Add type casting to plans where the schema has been declared (by
// user, data, or data catalog).
rulePlan = new RulePlan();
@@ -138,16 +127,6 @@ public class LogicalOptimizer extends
mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
LOStream.class.getName()), "StreamTypeCastInserter"));
- // Add type casting to plans where the schema has been declared by
- // user in a statement with native operator.
-
- rulePlan = new RulePlan();
- RuleOperator loNative= new RuleOperator(LONative.class,
- new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
- rulePlan.add(loNative);
- mRules.add(new Rule<LogicalOperator, LogicalPlan>(rulePlan, new TypeCastInserter(plan,
- LONative.class.getName()), "NativeTypeCastInserter"));
-
if(!turnAllRulesOff) {
// Push up limit wherever possible.
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PartitionFilterOptimizer.java Fri Aug 27 14:51:50 2010
@@ -35,7 +35,6 @@ import org.apache.pig.Expression.Column;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOFilter;
import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LONative;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
@@ -105,7 +104,7 @@ public class PartitionFilterOptimizer ex
"or empty list.";
throw new OptimizerException(msg, errCode, PigException.BUG);
}
- if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad || nodes.get(0) instanceof LONative)) {
+ if(nodes.size() != 1|| !(nodes.get(0) instanceof LOLoad )) {
return false;
}
if (!alreadyChecked.add(nodes.get(0))) {
@@ -113,9 +112,7 @@ public class PartitionFilterOptimizer ex
}
if(nodes.get(0) instanceof LOLoad) {
loLoad = (LOLoad)nodes.get(0);
- } else {
- loLoad = ((LONative)nodes.get(0)).getLoad();
- }
+ }
List<LogicalOperator> sucs = mPlan.getSuccessors(loLoad);
if(sucs == null || sucs.size() != 1 || !(sucs.get(0) instanceof LOFilter)) {
return false;
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/SchemaRemover.java Fri Aug 27 14:51:50 2010
@@ -183,13 +183,7 @@ public class SchemaRemover extends LOVis
// don't have a way to recover it.
super.visit(load);
}
-
- @Override
- protected void visit(LONative load) throws VisitorException{
- // We treat this in similar way as load
- super.visit(load);
- }
-
+
@Override
protected void visit(LOStore store) throws VisitorException{
store.unsetSchema();
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Fri Aug 27 14:51:50 2010
@@ -30,7 +30,6 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOCast;
import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LONative;
import org.apache.pig.impl.logicalLayer.LOProject;
import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
@@ -73,9 +72,6 @@ public class TypeCastInserter extends Lo
if(LOLoad.class.getName().equals(operatorClassName)) {
determinedSchema = ((LOLoad)op).getDeterminedSchema();
}
- if(LONative.class.getName().equals(operatorClassName)) {
- determinedSchema = ((LONative)op).getLoad().getDeterminedSchema();
- }
for (int i = 0; i < fss.size(); i++) {
if (fss.get(i).type != DataType.BYTEARRAY) {
if(determinedSchema == null ||
@@ -128,16 +124,6 @@ public class TypeCastInserter extends Lo
}
return lo;
- } else if(LONative.class.getName().equals(operatorClassName)) {
- if(lo == null || !(lo instanceof LONative)) {
- int errCode = 2005;
- String msg = "Expected " + LONative.class.getSimpleName()
- + ", got "
- + (lo == null ? lo : lo.getClass().getSimpleName());
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- return lo;
} else {
// we should never be called with any other operator class name
int errCode = 1034;
@@ -167,9 +153,6 @@ public class TypeCastInserter extends Lo
if(LOLoad.class.getName().equals(operatorClassName)) {
determinedSchema = ((LOLoad)lo).getDeterminedSchema();
}
- if(LONative.class.getName().equals(operatorClassName)) {
- determinedSchema = ((LONative)lo).getLoad().getDeterminedSchema();
- }
for (int i = 0; i < s.size(); i++) {
LogicalPlan p = new LogicalPlan();
genPlans.add(p);
@@ -199,8 +182,6 @@ public class TypeCastInserter extends Lo
StreamingCommand command = ((LOStream)lo).getStreamingCommand();
HandleSpec streamOutputSpec = command.getOutputSpec();
loadFuncSpec = new FuncSpec(streamOutputSpec.getSpec());
- } else if (lo instanceof LONative) {
- loadFuncSpec = ((LONative)lo).getLoad().getInputFile().getFuncSpec();
} else {
int errCode = 2006;
String msg = "TypeCastInserter invoked with an invalid operator class name: " + lo.getClass().getSimpleName();
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Fri Aug 27 14:51:50 2010
@@ -1509,8 +1509,7 @@ String StringList() :
//B = native ('mymr.jar' [, 'other.jar' ...]) A store into 'storeLocation' using storeFunc load 'loadLocation' using loadFunc ['params'];
LogicalOperator MapReduceClause(LogicalPlan lp) :
{
- LogicalPlan innerPlan = new LogicalPlan();
- LogicalOperator loLoad;
+ LogicalOperator loLoad;
LogicalOperator loStore;
LONative loNative;
Schema schema;
@@ -1537,11 +1536,19 @@ LogicalOperator MapReduceClause(LogicalP
]
<STORE>
- loStore = StoreClause(innerPlan)
+ loStore = StoreClause(lp)
+ {
+ ((LOStore)loStore).setTmpStore(true);
+ String inputAlias = ((LOStore)loStore).getAlias();
+ LogicalOperator input = mapAliasOp.get(inputAlias);
+ if (input == null)
+ throw new ParseException("Unable to find alias " + inputAlias);
+ lp.add(input);
+ lp.connect(input, loStore);
+
+ }
<LOAD>
- loLoad = LoadClause(innerPlan)
- // We do this so that Store gets a pred etc
- {innerPlan.connect(loLoad, loStore);}
+ loLoad = LoadClause(lp)
[ <AS>
(
LOOKAHEAD(2) "(" schema = TupleSchema() ")"
@@ -1565,19 +1572,13 @@ LogicalOperator MapReduceClause(LogicalP
}
]
{
- loNative = new LONative(lp, new OperatorKey(scope, getNextId()), innerPlan,
- (LOStore)loStore, (LOLoad)loLoad, nativeMRJar, params);
+ loNative = new LONative(lp, new OperatorKey(scope, getNextId()),
+ nativeMRJar, params);
lp.add(loNative);
-
- String inputAlias = ((LOStore)loStore).getAlias();
- LogicalOperator input = mapAliasOp.get(inputAlias);
- if (input == null)
- throw new ParseException("Unable to find alias " + inputAlias);
-
- lp.add(input);
- lp.connect(input, loNative);
- return loNative;
+ lp.connect(loStore, loNative);
+ lp.connect(loNative, loLoad);
+ return loLoad;
}
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/validators/InputOutputFileVisitor.java Fri Aug 27 14:51:50 2010
@@ -117,10 +117,5 @@ public class InputOutputFileVisitor exte
throw new PlanValidationException(errMsg, errCode, pigCtx.getErrorSource(), ie);
}
}
-
- @Override
- protected void visit(LONative nativeMR) throws PlanValidationException{
- visit(nativeMR.getStore());
- }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Fri Aug 27 14:51:50 2010
@@ -33,6 +33,7 @@ import org.apache.pig.impl.logicalLayer.
import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.logicalLayer.LOLoad;
+import org.apache.pig.impl.logicalLayer.LONative;
import org.apache.pig.impl.logicalLayer.LOSort;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
@@ -393,6 +394,25 @@ public class LogicalPlanMigrationVistor
opsMap.put(distinct, newDistinct);
translateConnection(distinct, newDistinct);
}
+
+
+ public void visit(LONative nativeMR) throws VisitorException {
+ org.apache.pig.newplan.logical.relational.LONative newNativeMR =
+ new org.apache.pig.newplan.logical.relational.LONative(
+ logicalPlan,
+ nativeMR.getNativeMRJar(),
+ nativeMR.getParams()
+ );
+ newNativeMR.setAlias(nativeMR.getAlias());
+ newNativeMR.setRequestedParallelism(nativeMR.getRequestedParallelism());
+ newNativeMR.setCustomPartitioner(nativeMR.getCustomPartitioner());
+
+ logicalPlan.add(newNativeMR);
+ opsMap.put(nativeMR, newNativeMR);
+ translateConnection(nativeMR, newNativeMR);
+ }
+
+
public void finish() {
for(org.apache.pig.newplan.logical.expression.LogicalExpression exp: scalarAliasMap.keySet()) {
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java?rev=990165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LONative.java Fri Aug 27 14:51:50 2010
@@ -0,0 +1,134 @@
+package org.apache.pig.newplan.logical.relational;
+
+import java.util.Arrays;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LONative extends LogicalRelationalOperator {
+
+ private String nativeMRJar;
+ private String[] params = null;
+// private LOLoad load;
+// private LOStore store;
+
+ public LONative(OperatorPlan plan, String nativeJar, String[] parameters) {
+ super("LONative", plan);
+// this.store = loStore;
+// this.load = loLoad;
+ this.nativeMRJar = nativeJar;
+ this.params = parameters;
+
+ }
+
+ @Override
+ public LogicalSchema getSchema() throws FrontendException {
+// return load.getSchema();
+ return null;
+ }
+
+ @Override
+ public void accept(PlanVisitor v) throws FrontendException {
+ if (!(v instanceof LogicalRelationalNodesVisitor)) {
+ throw new FrontendException("Expected LogicalPlanVisitor", 2223);
+ }
+ ((LogicalRelationalNodesVisitor)v).visit(this);
+ }
+
+ @Override
+ public boolean isEqual(Operator obj) throws FrontendException {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ LONative other = (LONative) obj;
+
+// if (load == null) {
+// if (other.load != null)
+// return false;
+// } else if (!load.equals(other.load))
+// return false;
+ if (nativeMRJar == null) {
+ if (other.nativeMRJar != null)
+ return false;
+ } else if (!nativeMRJar.equals(other.nativeMRJar))
+ return false;
+ if (!Arrays.equals(params, other.params))
+ return false;
+// if (store == null) {
+// if (other.store != null)
+// return false;
+// } else if (!store.equals(other.store))
+// return false;
+//
+ //check predecessors and schema
+ if(! checkEquality(other))
+ return false;
+
+ return true;
+ }
+
+
+ /**
+ * @return the nativeMRJar
+ */
+ public String getNativeMRJar() {
+ return nativeMRJar;
+ }
+
+ /**
+ * @param nativeMRJar the nativeMRJar to set
+ */
+ public void setNativeMRJar(String nativeMRJar) {
+ this.nativeMRJar = nativeMRJar;
+ }
+
+ /**
+ * @return the params
+ */
+ public String[] getParams() {
+ return params;
+ }
+
+ /**
+ * @param params the params to set
+ */
+ public void setParams(String[] params) {
+ this.params = params;
+ }
+
+// /**
+// * @return the load
+// */
+// public LOLoad getLoad() {
+// return load;
+// }
+//
+// /**
+// * @param load the load to set
+// */
+// public void setLoad(LOLoad load) {
+// this.load = load;
+// }
+//
+// /**
+// * @return the store
+// */
+// public LOStore getStore() {
+// return store;
+// }
+//
+// /**
+// * @param store the store to set
+// */
+// public void setStore(LOStore store) {
+// this.store = store;
+// }
+//
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Fri Aug 27 14:51:50 2010
@@ -47,6 +47,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -150,6 +151,42 @@ public class LogToPhyTranslationVisitor
// System.err.println("Exiting Load");
}
+
+ @Override
+ public void visit(LONative loNative) throws FrontendException{
+ String scope = DEFAULT_SCOPE;
+
+ PONative poNative = new PONative(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)));
+ poNative.setAlias(loNative.getAlias());
+ poNative.setNativeMRjar(loNative.getNativeMRJar());
+ poNative.setParams(loNative.getParams());
+ poNative.setResultType(DataType.BAG);
+
+ logToPhyMap.put(loNative, poNative);
+ currentPlan.add(poNative);
+
+ List<Operator> op = loNative.getPlan().getPredecessors(loNative);
+
+ PhysicalOperator from;
+ if(op != null) {
+ from = logToPhyMap.get(op.get(0));
+ } else {
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Native." ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+ }
+
+ try {
+ currentPlan.connect(from, poNative);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ }
+
@Override
public void visit(LOFilter filter) throws FrontendException {
String scope = DEFAULT_SCOPE;
Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java Fri Aug 27 14:51:50 2010
@@ -89,4 +89,7 @@ public abstract class LogicalRelationalN
public void visit(LOStream loStream) throws FrontendException {
}
+
+ public void visit(LONative nativeMR) throws FrontendException{
+ }
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=990165&r1=990164&r2=990165&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Fri Aug 27 14:51:50 2010
@@ -17,13 +17,14 @@
*/
package org.apache.pig.test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
-import junit.framework.TestCase;
-
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.BagFactory;
@@ -34,13 +35,13 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
-public class TestNativeMapReduce extends TestCase {
+public class TestNativeMapReduce {
static MiniCluster cluster = MiniCluster.buildCluster();
private PigServer pigServer;
// the jar has been created using the source at
// http://svn.apache.org/repos/asf/hadoop/mapreduce/trunk/src/examples/org/apache/hadoop/examples/WordCount.java:816822
private String jarFileName = "test//org/apache/pig/test/data/TestWordCount.jar";
-
+ private String exp_msg_prefix = "Check if expected results contains: ";
TupleFactory mTf = TupleFactory.getInstance();
BagFactory mBf = BagFactory.getInstance();
@@ -50,9 +51,9 @@ public class TestNativeMapReduce extends
}*/
@Before
- @Override
public void setUp() throws Exception{
FileLocalizer.setR(new Random());
+ //FileLocalizer.setInitialized(false);
pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
//createWordCountJar();
@@ -96,17 +97,17 @@ public class TestNativeMapReduce extends
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
@@ -117,17 +118,17 @@ public class TestNativeMapReduce extends
// check in interactive mode
iter = pigServer.openIterator("B");
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
@@ -173,17 +174,17 @@ public class TestNativeMapReduce extends
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
@@ -193,17 +194,17 @@ public class TestNativeMapReduce extends
// check in interactive mode
iter = pigServer.openIterator("B");
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
@@ -245,33 +246,33 @@ public class TestNativeMapReduce extends
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
iter = pigServer.openIterator("B");
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());
@@ -310,17 +311,17 @@ public class TestNativeMapReduce extends
Iterator<Tuple> iter = pigServer.openIterator("C");
Tuple t;
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
- assertTrue(iter.hasNext());
+ assertTrue("iter.hasNext()",iter.hasNext());
t = iter.next();
- assertTrue(results.contains(t.toString()));
+ assertTrue(exp_msg_prefix + t, results.contains(t.toString()));
assertFalse(iter.hasNext());