You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/03/04 19:15:14 UTC

svn commit: r1078085 [7/12] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ sr...

Added: pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java?rev=1078085&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java (added)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java Fri Mar  4 18:15:11 2011
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+
+public class QueryParserUtils {
+	private static Log log = LogFactory.getLog( LogicalPlanGenerator.class );
+
+	private static String removeQuotes(String str) {
+        if (str.startsWith("\u005c'") && str.endsWith("\u005c'"))
+            return str.substring(1, str.length() - 1);
+        else
+            return str;
+    }
+
+    public static void attachStorePlan(LogicalPlan lp, String fileName,	String func, 
+    		Operator input, String alias, PigContext pigContext) throws FrontendException {
+        if( func == null ) {
+            func = PigStorage.class.getName();
+        }
+
+        FuncSpec funcSpec = new FuncSpec( func );
+        StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec( funcSpec );
+        stoFunc.setStoreFuncUDFContextSignature( LOStore.constructSignature( alias, fileName, funcSpec ) );
+
+        fileName = removeQuotes( fileName );
+        FileSpec fileSpec = new FileSpec( fileName, funcSpec );
+        LOStore store = new LOStore( lp, fileSpec );
+        store.setAlias( alias );
+
+        try {
+            stoFunc.relToAbsPathForStoreLocation( fileName, getCurrentDir( pigContext ) );
+        } catch (IOException ioe) {
+            FrontendException e = new FrontendException(  ioe.getMessage(), ioe );
+            throw e;
+        }
+
+        lp.add( store );
+        lp.connect( input, store );
+    }
+
+    static Path getCurrentDir(PigContext pigContext) throws IOException {
+        DataStorage dfs = pigContext.getDfs();
+        ContainerDescriptor desc = dfs.getActiveContainer();
+        ElementDescriptor el = dfs.asElement(desc);
+        return new Path(el.toString());
+    }
+    
+    static void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
+        // Get native host
+        String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
+        URI defaultFSURI = new URI(defaultFS);
+        String defaultHost = defaultFSURI.getHost();
+        if (defaultHost == null) defaultHost = "";
+                
+        defaultHost = defaultHost.toLowerCase();
+    
+        Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
+                    
+        String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
+        if (hdfsServersString == null) hdfsServersString = "";
+        String hdfsServers[] = hdfsServersString.split(",");
+                    
+        for (String remoteHost : remoteHosts) {
+            boolean existing = false;
+            for (String hdfsServer : hdfsServers) {
+                if (hdfsServer.equals(remoteHost)) {
+                    existing = true;
+                }
+            }
+            if (!existing) {
+                if (!hdfsServersString.isEmpty()) {
+                    hdfsServersString = hdfsServersString + ",";
+                }
+                hdfsServersString = hdfsServersString + remoteHost;
+            }
+        }
+    
+        if (!hdfsServersString.isEmpty()) {
+            pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
+        }
+    }
+
+	 static Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
+	     String HAR_PREFIX = "hdfs-";
+	     Set<String> result = new HashSet<String>();
+	     String[] fnames = absolutePath.split(",");
+	     for (String fname: fnames) {
+	         // remove leading/trailing whitespace(s)
+	         fname = fname.trim();
+	         Path p = new Path(fname);
+	         URI uri = p.toUri();
+	         if(uri.isAbsolute()) {
+	             String scheme = uri.getScheme();
+	             if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
+	                 if (uri.getHost()==null)
+	                     continue;
+	                 String thisHost = uri.getHost().toLowerCase();
+	                 if (scheme.toLowerCase().equals("har")) {
+	                     if (thisHost.startsWith(HAR_PREFIX)) {
+	                         thisHost = thisHost.substring(HAR_PREFIX.length());
+	                     }
+	                 }
+	                 if (!uri.getHost().isEmpty() && 
+	                         !thisHost.equals(defaultHost)) {
+	                     if (uri.getPort()!=-1)
+	                         result.add("hdfs://"+thisHost+":"+uri.getPort());
+	                     else
+	                         result.add("hdfs://"+thisHost);
+	                 }
+	             }
+	         }
+	     }
+	     return result;
+	 }
+
+	 static String constructFileNameSignature(String fileName, FuncSpec funcSpec) {
+		 return fileName+"_"+funcSpec.toString();
+	 }
+
+
+	    static String constructSignature(String alias, String filename, FuncSpec funcSpec) {
+	        return alias+"_"+filename+"_"+funcSpec.toString();
+	    }
+}

Added: pig/trunk/src/org/apache/pig/parser/StreamingCommandUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/StreamingCommandUtils.java?rev=1078085&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/StreamingCommandUtils.java (added)
+++ pig/trunk/src/org/apache/pig/parser/StreamingCommandUtils.java Fri Mar  4 18:15:11 2011
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.parser;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+// Check and set files to be automatically shipped for the given StreamingCommand
+// Auto-shipping rules:
+// 1. If the command begins with either perl or python assume that the 
+//    binary is the first non-quoted string it encounters that does not 
+//    start with dash - subject to restrictions in (2).
+// 2. Otherwise, attempt to ship the first string from the command line as 
+//    long as it does not come from /bin, /user/bin, /user/local/bin. 
+//    It will determine that by scanning the path if an absolute path is 
+//    provided or by executing "which". The paths can be made configurable 
+//    via "set stream.skippath <paths>" option.
+public class StreamingCommandUtils {
+    private static final String PERL = "perl";
+    private static final String PYTHON = "python";
+    private static final char SINGLE_QUOTE = '\u005c'';
+    private static final char DOUBLE_QUOTE = '"';
+    
+    private final PigContext pigContext;
+    
+    public StreamingCommandUtils(PigContext pigContext) {
+        this.pigContext = pigContext;
+    }
+    
+    static String[] splitArgs(String command) throws ParseException {
+        List<String> argv = new ArrayList<String>();
+
+        int beginIndex = 0;
+
+        while (beginIndex < command.length()) {
+            // Skip spaces
+            while (Character.isWhitespace(command.charAt(beginIndex))) {
+                ++beginIndex;
+            }
+
+            char delim = ' ';
+            char charAtIndex = command.charAt(beginIndex);
+            if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+                delim = charAtIndex;
+            }
+
+            int endIndex = command.indexOf(delim, beginIndex+1);
+            if (endIndex == -1) {
+                if (Character.isWhitespace(delim)) {
+                    // Reached end of command-line
+                    argv.add(command.substring(beginIndex));
+                    break;
+                } else {
+                    // Didn't find the ending quote/double-quote
+                    throw new ParseException("Illegal command: " + command);
+                }
+            }
+
+            if (Character.isWhitespace(delim)) {
+                // Do not consume the space
+                argv.add(command.substring(beginIndex, endIndex));
+            } else {
+                argv.add(command.substring(beginIndex, endIndex+1));
+            }
+
+            beginIndex = endIndex + 1;
+        }
+
+        return argv.toArray(new String[argv.size()]);
+    }
+
+    void checkAutoShipSpecs(StreamingCommand command, String[] argv) 
+    throws ParseException {
+        // Candidate for auto-ship
+        String arg0 = argv[0];
+        
+        // Check if command is perl or python ... if so use the first non-option
+        // and non-quoted string as the candidate
+       if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
+           for (int i=1; i < argv.length; ++i) {
+               if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
+                   checkAndShip(command, argv[i]);
+                   break;
+               }
+           }
+       } else {
+           // Ship the first argument if it can be ...
+           checkAndShip(command, arg0);
+       }
+    }
+    
+    private void checkAndShip(StreamingCommand command, String arg) 
+    throws ParseException {
+        // Don't auto-ship if it is an absolute path...
+        if (arg.startsWith("/")) {
+            return;
+        }
+        
+        // $ which arg
+        String argPath = which(arg);
+        if (argPath != null && !inSkipPaths(argPath)) {
+            try {
+                command.addPathToShip(argPath);
+            } catch(IOException e) {
+               ParseException pe = new ParseException(e.getMessage());
+               pe.initCause(e);
+               throw pe;
+           }
+        }
+         
+    }
+
+    private static boolean isQuotedString(String s) {
+         return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
+     }
+     
+     // Check if file is in the list paths to be skipped 
+     private boolean inSkipPaths(String file) {
+         for (String skipPath : pigContext.getPathsToSkip()) {
+             if (file.startsWith(skipPath)) {
+                 return true;
+             }
+         }
+        return false;
+     }
+
+     private static String which(String file) {
+         try {
+             ProcessBuilder processBuilder = 
+                 new ProcessBuilder(new String[] {"which", file});
+             Process process = processBuilder.start();
+     
+             BufferedReader stdout = 
+                 new BufferedReader(new InputStreamReader(process.getInputStream()));
+             String fullPath = stdout.readLine();
+
+             return (process.waitFor() == 0) ? fullPath : null;
+         } catch (Exception e) {}
+         return null;
+      }
+
+}

Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Fri Mar  4 18:15:11 2011
@@ -184,8 +184,8 @@ public class AugmentBaseDataVisitor exte
         int numCols = -1;
 
         for (int index = 0; index < cg.getInputs((LogicalPlan)plan).size(); ++index) {
-            Collection<LogicalExpressionPlan> groupByPlans = (List<LogicalExpressionPlan>) cg
-                    .getExpressionPlans().get(index);
+            Collection<LogicalExpressionPlan> groupByPlans = 
+                cg.getExpressionPlans().get(index);
             List<Integer> groupCols = new ArrayList<Integer>();
             for (LogicalExpressionPlan plan : groupByPlans) {
                 Operator leaf = plan.getSinks().get(0);
@@ -299,8 +299,8 @@ public class AugmentBaseDataVisitor exte
         int numCols = -1;
 
         for (int index = 0; index < join.getInputs((LogicalPlan)plan).size(); ++index) {
-            Collection<LogicalExpressionPlan> groupByPlans = (List<LogicalExpressionPlan>) join
-                    .getExpressionPlans().get(index);
+            Collection<LogicalExpressionPlan> groupByPlans = 
+                join.getExpressionPlans().get(index);
             List<Integer> groupCols = new ArrayList<Integer>();
             for (LogicalExpressionPlan plan : groupByPlans) {
                 Operator leaf = plan.getSinks().get(0);

Modified: pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java Fri Mar  4 18:15:11 2011
@@ -43,8 +43,8 @@ import org.apache.pig.impl.io.FileLocali
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOLimit;
@@ -60,7 +60,7 @@ import org.apache.pig.pen.util.LineageTr
 public class ExampleGenerator {
 
     LogicalPlan plan;
-    org.apache.pig.newplan.logical.relational.LogicalPlan newPlan;
+    LogicalPlan newPlan;
     Map<LOLoad, DataBag> baseData = null;
     PigContext pigContext;
 
@@ -126,7 +126,7 @@ public class ExampleGenerator {
         if (loads.size() != pRoots.size())
             throw new ExecException("Logical and Physical plans have different number of roots");
         logToPhyMap = execEngine.getLogToPhyMap();
-        forEachInnerLogToPhyMap = execEngine.getForEachInnerLogToPhyMap();
+        forEachInnerLogToPhyMap = execEngine.getForEachInnerLogToPhyMap(plan);
         poLoadToLogMap = new HashMap<PhysicalOperator, Operator>();
         logToDataMap = new HashMap<Operator, DataBag>();
         poToLogMap = new HashMap<PhysicalOperator, Operator>();

Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Fri Mar  4 18:15:11 2011
@@ -619,6 +619,7 @@ public class GruntParser extends PigScri
     protected void processDump(String alias) throws IOException
     {
         if(mExplain == null) { // process only if not in "explain" mode
+        	executeBatch();
             Iterator<Tuple> result = mPigServer.openIterator(alias);
             while (result.hasNext())
             {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java Fri Mar  4 18:15:11 2011
@@ -229,7 +229,7 @@ public abstract class PigStats {
             };
         }
  
-        boolean isConnected(Operator from, Operator to) {
+        public boolean isConnected(Operator from, Operator to) {
             List<Operator> succs = null;
             succs = getSuccessors(from);
             if (succs != null) {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Mar  4 18:15:11 2011
@@ -66,27 +66,27 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LODistinct;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOJoin;
-import org.apache.pig.impl.logicalLayer.LOLimit;
-import org.apache.pig.impl.logicalLayer.LONative;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOStream;
-import org.apache.pig.impl.logicalLayer.LOUnion;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.LOCogroup.GROUPTYPE;
-import org.apache.pig.impl.logicalLayer.LOJoin.JOINTYPE;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LONative;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 
 /**
@@ -335,7 +335,7 @@ public class ScriptState {
         BitSet bs = new BitSet();
         try {
             new LogicalPlanFeatureVisitor(plan, bs).visit();
-        } catch (VisitorException e) {
+        } catch (FrontendException e) {
             LOG.warn("unable to get script feature", e);
         }
         scriptFeatures = bitSetToLong(bs);        
@@ -604,24 +604,23 @@ public class ScriptState {
         }        
     }    
     
-    static class LogicalPlanFeatureVisitor extends LOVisitor {
+    static class LogicalPlanFeatureVisitor extends LogicalRelationalNodesVisitor {
         
         private BitSet feature;
         
-        protected LogicalPlanFeatureVisitor(LogicalPlan plan, BitSet feature) {
-            super(plan, new DepthFirstWalker<LogicalOperator, 
-                    LogicalPlan>(plan));            
+        protected LogicalPlanFeatureVisitor(LogicalPlan plan, BitSet feature) throws FrontendException {
+            super(plan, new org.apache.pig.newplan.DepthFirstWalker(plan));            
             this.feature = feature;
         }
         
         @Override
-        protected void visit(LOCogroup op) throws VisitorException {
+        public void visit(LOCogroup op) {
             if (op.getGroupType() == GROUPTYPE.COLLECTED) {
                 feature.set(PIG_FEATURE.COLLECTED_GROUP.ordinal());
             } else if (op.getGroupType() == GROUPTYPE.MERGE) {
                 feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());                
             } else if (op.getGroupType() == GROUPTYPE.REGULAR) {
-                if (op.getInputs().size() > 1) {
+                if (op.getExpressionPlans().size() > 1) {
                     feature.set(PIG_FEATURE.COGROUP.ordinal());
                 } else {
                     feature.set(PIG_FEATURE.GROUP_BY.ordinal());
@@ -630,27 +629,27 @@ public class ScriptState {
         }
         
         @Override
-        protected void visit(LOCross op) throws VisitorException {
+        public void visit(LOCross op) {
             feature.set(PIG_FEATURE.CROSS.ordinal());
         }
         
         @Override
-        protected void visit(LODistinct op) throws VisitorException {
+        public void visit(LODistinct op) {
             feature.set(PIG_FEATURE.DISTINCT.ordinal());
         }
         
         @Override
-        protected void visit(LOFilter op) throws VisitorException {
+        public void visit(LOFilter op) {
             feature.set(PIG_FEATURE.FILTER.ordinal());
         }
         
         @Override
-        protected void visit(LOForEach op) throws VisitorException {
+        public void visit(LOForEach op) {
             
         }
                 
         @Override
-        protected void visit(LOJoin op) throws VisitorException {
+        public void visit(LOJoin op) {
             if (op.getJoinType() == JOINTYPE.HASH) {
                 feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
             } else if (op.getJoinType() == JOINTYPE.MERGE) {
@@ -663,32 +662,32 @@ public class ScriptState {
         }
         
         @Override
-        protected void visit(LOLimit op) throws VisitorException {
+        public void visit(LOLimit op) {
             feature.set(PIG_FEATURE.LIMIT.ordinal());
         }
         
         @Override
-        protected void visit(LOSort op) throws VisitorException {
+        public void visit(LOSort op) {
             feature.set(PIG_FEATURE.ORDER_BY.ordinal());
         }
         
         @Override
-        protected void visit(LOStream op) throws VisitorException {
+        public void visit(LOStream op) {
             feature.set(PIG_FEATURE.STREAMING.ordinal());
         }
         
         @Override
-        protected void visit(LOSplit op) throws VisitorException {
+        public void visit(LOSplit op) {
             
         }
         
         @Override
-        protected void visit(LOUnion op) throws VisitorException {
+        public void visit(LOUnion op) {
             feature.set(PIG_FEATURE.UNION.ordinal());
         }
         
         @Override
-        protected void visit(LONative n) throws VisitorException {
+        public void visit(LONative n) {
             feature.set(PIG_FEATURE.NATIVE.ordinal());
         }
 

Modified: pig/trunk/test/org/apache/pig/parser/ParserTestingUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/ParserTestingUtils.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/ParserTestingUtils.java (original)
+++ pig/trunk/test/org/apache/pig/parser/ParserTestingUtils.java Fri Mar  4 18:15:11 2011
@@ -70,7 +70,9 @@ public class ParserTestingUtils {
     throws RecognitionException, ParsingFailureException, IOException {
         Tree ast = validateAst( query );
         
-        LogicalPlanGenerator walker = new LogicalPlanGenerator( new CommonTreeNodeStream( ast ) );
+        CommonTreeNodeStream input = new CommonTreeNodeStream( ast );
+        LogicalPlanBuilder builder = new LogicalPlanBuilder( input );
+        LogicalPlanGenerator walker = new LogicalPlanGenerator( input, builder );
         walker.query();
         
         if( 0 < walker.getNumberOfSyntaxErrors() ) 

Modified: pig/trunk/test/org/apache/pig/parser/TestAstValidator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestAstValidator.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestAstValidator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestAstValidator.java Fri Mar  4 18:15:11 2011
@@ -88,11 +88,11 @@ public class TestAstValidator {
     public void tesNegative1() throws RecognitionException, IOException {
         try {
             ParserTestingUtils.validateAst( "A = load 'x' as ( u:int, v:long, u:chararray, w:bytearray );" );
-        } catch(ParsingFailureException ex) {
-            Assert.assertEquals( AstValidator.class, ex.getParsingClass() );
+        } catch(Exception ex) {
+            Assert.assertTrue( ex instanceof DuplicatedSchemaAliasException );
             return;
         }
-        Assert.assertTrue( false ); // should never come here.
+        Assert.fail( "Testcase should fail" );
     }
 
     /**
@@ -102,22 +102,22 @@ public class TestAstValidator {
     public void tesNegative2() throws RecognitionException, IOException {
         try {
             ParserTestingUtils.validateAst( "A = load 'x' as ( u:int, v:long, w:tuple( w:long, u:chararray, w:bytearray) );" );
-        } catch(ParsingFailureException ex) {
-            Assert.assertEquals( AstValidator.class, ex.getParsingClass() );
+        } catch(Exception ex) {
+            Assert.assertTrue( ex instanceof DuplicatedSchemaAliasException );
             return;
         }
-        Assert.assertTrue( false ); // should never come here.
+        Assert.fail( "Testcase should fail" );
     }
 
     @Test
     public void tesNegative3() throws RecognitionException, IOException {
         try {
             ParserTestingUtils.validateAst( "A = load 'x'; C = limit B 100;" );
-        } catch(ParsingFailureException ex) {
-            Assert.assertEquals( AstValidator.class, ex.getParsingClass() );
+        } catch(Exception ex) {
+            Assert.assertTrue( ex instanceof UndefinedAliasException );
             return;
         }
-        Assert.assertTrue( false ); // should never come here.
+        Assert.fail( "Testcase should fail" );
     }
     
     // TODO: need a test similar to above but for foreach inner plan.

Modified: pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestColumnAliasConversion.java Fri Mar  4 18:15:11 2011
@@ -45,8 +45,6 @@ public class TestColumnAliasConversion {
         verify( query );
     }
 
-    // Temporary disable the test. Will reenable it after parser fix checked in
-    /*
     @Test
     public void test2() throws RecognitionException, ParsingFailureException, IOException {
         String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
@@ -70,7 +68,6 @@ public class TestColumnAliasConversion {
                        "C = store B into 'output';";
         validate( query );
     }
-    */
     
     @Test
     public void test5() throws RecognitionException, ParsingFailureException, IOException {
@@ -97,21 +94,18 @@ public class TestColumnAliasConversion {
         validate( query );
     }
     
-    // Temporary disable the test. Will reenable it after parser fix checked in
-    /*
-    @Test
+   @Test
     public void testNegative1() throws RecognitionException, ParsingFailureException, IOException {
         String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
                        "B = foreach A generate u.(x, $3), v, w; " +
                        "C = store B into 'output';";
         try {
-        	validate( query );
+            validate( query );
         } catch(PlanValidationException ex) {
-        	return;
+            return;
         }
         Assert.fail( "Query should fail to validate." );
     }
-    */
     
     @Test
     public void testNegative2() throws RecognitionException, ParsingFailureException, IOException {
@@ -119,9 +113,9 @@ public class TestColumnAliasConversion {
                        "B = foreach A generate u.(x, y), v, $5; " +
                        "C = store B into 'output';";
         try {
-        	validate( query );
+            validate( query );
         } catch(PlanValidationException ex) {
-        	return;
+            return;
         }
         Assert.fail( "Query should fail to validate." );
     }
@@ -132,28 +126,25 @@ public class TestColumnAliasConversion {
                        "B = foreach A generate u.(x, y), v, x; " +
                        "C = store B into 'output';";
         try {
-        	validate( query );
+            validate( query );
         } catch(PlanValidationException ex) {
-        	return;
+            return;
         }
         Assert.fail( "Query should fail to validate." );
     }
     
-    // Temporary disable the test. Will reenable it after parser fix checked in
-    /*
     @Test
     public void testNegative4() throws RecognitionException, ParsingFailureException, IOException {
         String query = "A = load 'x' as ( u:bag{tuple(x, y)}, v:long, w:bytearray); " + 
                        "B = foreach A generate u.z, v, w; " +
                        "C = store B into 'output';";
         try {
-        	validate( query );
+            validate( query );
         } catch(PlanValidationException ex) {
-        	return;
+            return;
         }
         Assert.fail( "Query should fail to validate." );
     }
-    */
     
     @Test
     public void testNegative5() throws RecognitionException, ParsingFailureException, IOException {
@@ -161,9 +152,9 @@ public class TestColumnAliasConversion {
                        "B = foreach A generate u, $1; " +
                        "C = store B into 'output';";
         try {
-        	validate( query );
+            validate( query );
         } catch(PlanValidationException ex) {
-        	return;
+            return;
         }
         Assert.fail( "Query should fail to validate." );
     }

Modified: pig/trunk/test/org/apache/pig/parser/TestLexer.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLexer.pig?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLexer.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLexer.pig Fri Mar  4 18:15:11 2011
@@ -23,7 +23,7 @@
 
 aa = load '/data/intermediate/pow/elcarobootstrap/account/full/weekly/data/$date' using org.apache.pig.PigStorage('\n');
 bb = filter aa by (ARITY == '16') and ( $4 eq '' or $4 eq 'NULL' or $4 eq 'ss') parallel 400;
-a = foreach bb generate $0,$12,$7;
+a = foreach bb generate $0,$12,$7, $1.$1;
 
 define myudf org.apache.pig.TextLoader( 'test', 'data' );
 
@@ -54,13 +54,13 @@ E = join A by $0, B by $0 using 'replica
 
 F = Cross A, B;
 
-G = Split A into X if $0 > 0, Y if $0 == 0;
+G = Split A into X if $0 > 0, Y if $0 == 2L;
 
 H = union onschema A, B;
 
 B = GROUP A ALL using 'collected';
 
-I = foreach A generate flatten(c);
+I = foreach A generate flatten(B::c);
 
 CMD = `ls -l`;
 A = stream through CMD;

Modified: pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestLogicalPlanGenerator.java Fri Mar  4 18:15:11 2011
@@ -29,13 +29,14 @@ import org.junit.Test;
 public class TestLogicalPlanGenerator {
     @Test
     public void test1() {
-        String query = "A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " + 
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
                        "B = limit A 100; " +
                        "C = filter B by 2 > 1; " +
                        "D = load 'y' as (d1, d2); " +
                        "E = join C by ( $0, $1 ), D by ( d1, d2 ) using 'replicated' parallel 16; " +
                        "F = store E into 'output';";
         generateLogicalPlan( query );
+
     }
 
     @Test
@@ -55,6 +56,11 @@ public class TestLogicalPlanGenerator {
 
     @Test
     public void test3() {
+        String query = "a = load '1.txt'  as (name, age, gpa);" + 
+                       "b = group a by name PARTITION BY org.apache.pig.test.utils.SimpleCustomPartitioner2;" +
+                       "c = foreach b generate group, COUNT(a.age);" +
+                       "store c into 'y';";
+        generateLogicalPlan( query );
     }
     
     private void generateLogicalPlan(String query) {
@@ -81,7 +87,7 @@ public class TestLogicalPlanGenerator {
     // Test define function.
     @Test
     public void test5() {
-        String query = "define myudf org.apache.pig.TextLoader( 'test', 'data' );" +
+        String query = "define myudf org.apache.pig.builtin.PigStorage( ',' );" +
                        "A = load 'x' using myudf;" +
                        "store A into 'y';";
         generateLogicalPlan( query );
@@ -137,28 +143,87 @@ public class TestLogicalPlanGenerator {
     }
 
     @Test
+    public void test12() {
+        String query = "define CMD `perl GroupBy.pl '\t' 0 1` ship('/homes/jianyong/pig_harness/libexec/PigTest/GroupBy.pl');" +
+                       "A = load '/user/pig/tests/data/singlefile/studenttab10k';" +
+                       "B = group A by $0;" +
+                       "C = foreach B {" +
+                       "   D = order A by $1; " +
+                       "   generate flatten(D);" +
+                       "};" +
+                       "E = stream C through CMD;" +
+                       "store E into '/user/pig/out/jianyong.1297238871/ComputeSpec_8.out';";
+        generateLogicalPlan( query );
+    }
+    
+    @Test
+    public void test13() {
+        String query = "define CMD `perl PigStreaming.pl` ship('/homes/jianyong/pig_harness/libexec/PigTest/PigStreaming.pl') stderr('CMD');" +
+                       "A = load '/user/pig/tests/data/singlefile/studenttab10k';" +
+                       "C = stream A through CMD;" +
+                       "store C into '/user/pig/out/jianyong.1297238871/StreamingPerformance_1.out';";
+        generateLogicalPlan( query );
+    }
+    
+    @Test
+    public void test14() {
+        String query = "a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa);" +
+                       "b = load '/user/pig/tests/data/singlefile/votertab10k' as (name, age, registration, contributions);" +
+                       "e = cogroup a by name, b by name parallel 8;" +
+                       "f = foreach e generate group,  SUM(a.age) as s;" +
+                       "g = filter f by s>0;" +
+                       "store g into '/user/pig/out/jianyong.1297323675/Accumulator_1.out';";
+        generateLogicalPlan( query );
+    }
+    
+    @Test
+    public void test15() {
+        String query = "a = load '/user/pig/tests/data/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);" +
+                       "b = group a all;" +
+                       "c = foreach b generate AVG(a.age) as avg; " +
+                       "d = load '/user/pig/tests/data/singlefile/votertab10k' using PigStorage() as (name, age, registration, contributions);" +
+                       "e = group d all;" +
+                       "f = foreach e generate AVG(d.age) as avg;" +
+                       "y = foreach a generate age/c.avg, age/f.avg;" +
+                       "store y into '/user/pig/out/jianyong.1297323675/Scalar_4.out';";
+        generateLogicalPlan( query );
+    }
+    
+    @Test
+    public void test16() {
+        String query = "AA = load '/user/pig/tests/data/singlefile/studenttab10k';" +
+                       "A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);" +
+                       "store A into '/user/pig/out/jianyong.1297323675/Scalar_4.out';";
+        generateLogicalPlan( query );
+    }
+
+    @Test
     public void testFilter() {
-        String query = "A = load 'x' using org.apache.pig.TextLoader( 'a', 'b' ) as ( u:int, v:long, w:bytearray); " + 
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray); " + 
                        "B = filter A by 2 > 1; ";
         generateLogicalPlan( query );
     }
 
     @Test
+    public void testScopedAlias() {
+        String query = "A = load 'x' as ( u:int, v:long, w:bytearray);" + 
+                       "B = load 'y' as ( u:int, x:int, y:chararray);" +
+                       "C = join A by u, B by u;" +
+                       "D = foreach C generate A::u, B::u, v, x;" +
+                       "store D into 'z';";
+        generateLogicalPlan ( query );
+    }
+
+    @Test
     public void testNegative1() {
         String query = "A = load 'x' as ( a : bag{ T:tuple(u, v) }, c : int, d : long );" +
                        "B = foreach A { S = c * 2; T = limit S 100; generate T; };" +
                        "store B into 'y';";
         try {
             ParserTestingUtils.generateLogicalPlan( query );
-        } catch (RecognitionException e) {
-            e.printStackTrace();
-        } catch (ParsingFailureException e) {
-            // Expected exception.
-            e.printStackTrace();
-            Assert.assertEquals( e.getParsingClass(), LogicalPlanGenerator.class );
+        } catch(Exception ex) {
+            Assert.assertTrue( ex instanceof NonProjectExpressionException );
             return;
-        } catch (IOException e) {
-            e.printStackTrace();
         }
         Assert.fail( "Query is supposed to be failing." );
     }

Modified: pig/trunk/test/org/apache/pig/parser/TestParser.pig
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestParser.pig?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestParser.pig (original)
+++ pig/trunk/test/org/apache/pig/parser/TestParser.pig Fri Mar  4 18:15:11 2011
@@ -67,6 +67,8 @@ B = GROUP A ALL using 'collected';
 
 --join
 E = join A by $0, B by $0 using 'replicated';
+H = join A by u, B by u;
+I = foreach H generate A::u, B::u;
 
 --croos
 F = Cross A, B;

Modified: pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryLexer.java Fri Mar  4 18:15:11 2011
@@ -48,7 +48,7 @@ public class TestQueryLexer {
         
         // While we can check more conditions, such as type of each token, for now I think the following
         // is enough. If the token type is wrong, it will be most likely caught by the parser.
-        Assert.assertEquals( 413, tokenCount );
+        Assert.assertEquals( 402, tokenCount );
         Assert.assertEquals( 0, lexer.getNumberOfSyntaxErrors() );
     }
     

Modified: pig/trunk/test/org/apache/pig/parser/TestQueryParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParser.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParser.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParser.java Fri Mar  4 18:15:11 2011
@@ -85,7 +85,70 @@ public class TestQueryParser {
 
     @Test
     public void test3() throws IOException, RecognitionException {
-        shouldPass("a = load '1.txt' as (a0); b = foreach a generate flatten((bag{T:tuple(m:map[])})a0) as b0:map[];c = foreach b generate (long)b0#'key1';");
+        String query = "a = load '1.txt' as (a0);" +
+                       "b = foreach a generate flatten( (bag{tuple(map[])})a0 ) as b0:map[];" +
+                       "c = foreach b generate (long)b0#'key1';";
+        shouldPass( query );
+    }
+
+    @Test
+    public void test4() throws IOException, RecognitionException {
+        String query = "a = load '1.txt'  as (name, age, gpa); b = group a by name;" +
+                       "c = foreach b generate group, COUNT(a.age);" +
+                       "store c into 'y';";
+        shouldPass( query );
+    }
+    
+    @Test
+    public void test5() throws IOException, RecognitionException {
+        String query = "a = load 'x' as (name, age, gpa);" +
+            "b = foreach a generate name, age +  2L, 3.125F, 3.4e2;" +
+            " store b into 'y'; ";
+        shouldPass( query );
+    }
+    
+    @Test
+    public void test6() throws IOException, RecognitionException {
+        String query = "a = load '/user/pig/tests/data/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double);" +
+                       "b = foreach a generate (int)((int)gpa/((int)gpa - 1)) as norm_gpa:int;" + 
+                       "c = foreach b generate (norm_gpa is not null? norm_gpa: 0);" +
+                       "store c into '/user/pig/out/jianyong.1297229709/Types_37.out';";
+        shouldPass( query );
+    }
+    
+    @Test
+    public void test7() throws IOException, RecognitionException {
+        String query = "a = load '/user/pig/tests/data/singlefile/studenttab10k';" +
+                       "b = group a by $0;" +
+                       "c = foreach b {c1 = order $1 by * using org.apache.pig.test.udf.orderby.OrdDesc; generate flatten(c1); };" +
+                       "store c into '/user/pig/out/jianyong.1297305352/Order_15.out';";
+        shouldPass( query );
+    }
+    
+    @Test
+    public void test8() throws IOException, RecognitionException {
+        String query = "a = load '/user/pig/tests/data/singlefile/studenttab10k';" +
+                       "b = group a by $0;" +
+                       "c = foreach b {c1 = order $1 by $1; generate flatten(c1), MAX($1.$1); };" +
+                       "store c into '/user/pig/out/jianyong.1297305352/Order_17.out';";
+        shouldPass( query );
+    }
+    
+    @Test
+    public void test9() throws IOException, RecognitionException {
+        String query = "a = load 'x' as (u,v);" +
+                       "b = load 'y' as (u,w);" +
+                       "c = join a by u, b by u;" +
+                       "d = foreach c generate a::u, b::u, w;";
+        shouldPass( query );
+    }
+
+    @Test
+    public void test10() throws IOException, RecognitionException {
+        String query = "a = load 'x' as (name, age, gpa);" +
+            "b = FOREACH C GENERATE group, flatten( ( 1 == 2 ? 2 : 3 ) );" +
+            " store b into 'y'; ";
+        shouldPass( query );
     }
 
     @Test
@@ -184,7 +247,12 @@ public class TestQueryParser {
 
     private void shouldFail(String query) throws RecognitionException, IOException {
         System.out.println("Testing: " + query);
-        Assert.assertFalse(query + " should have failed", 0 == parse(query));
+        try {
+            parse( query );
+        } catch(Exception ex) {
+            return;
+        }
+        Assert.fail( query + " should have failed" );
     }
     
     private int parse(String query) throws IOException, RecognitionException  {

Modified: pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java (original)
+++ pig/trunk/test/org/apache/pig/parser/TestScalarVisitor.java Fri Mar  4 18:15:11 2011
@@ -19,10 +19,13 @@
 package org.apache.pig.parser;
 
 import java.io.IOException;
+import java.util.Properties;
 
 import junit.framework.Assert;
 
 import org.antlr.runtime.RecognitionException;
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
 
@@ -61,17 +64,19 @@ public class TestScalarVisitor {
                        "C = foreach A generate B.w, $0; " +
                        "D = store C into 'output';";
         try {
-        	visit( query );
-        } catch(ParsingFailureException ex) {
-        	// Expected exception
-        	return;
+            visit( query );
+        } catch(Exception ex) {
+            Assert.assertTrue( ex instanceof InvalidScalarProjectionException );
+            return;
         }
         Assert.fail( "Test case should fail" );
     }
 
     private LogicalPlan visit(String query) throws RecognitionException, ParsingFailureException, IOException {
         LogicalPlan plan = ParserTestingUtils.generateLogicalPlan( query );
-        ScalarVisitor visitor = new ScalarVisitor( plan );
+        PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
+        pc.connect();
+        ScalarVisitor visitor = new ScalarVisitor( plan, pc );
         visitor.visit();
         return plan;
     }

Modified: pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Fri Mar  4 18:15:11 2011
@@ -22,7 +22,6 @@ import java.util.Iterator;
 import java.util.List;
 
 import junit.framework.Assert;
-import junit.framework.TestCase;
 
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.ExecType;
@@ -49,7 +48,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-public class TestCollectedGroup extends TestCase {
+public class TestCollectedGroup {
     private static final String INPUT_FILE = "MapSideGroupInput.txt";
     
     private PigServer pigServer;
@@ -95,9 +94,9 @@ public class TestCollectedGroup extends 
         pc.connect();
         try {
             Util.buildMRPlan(Util.buildPhysicalPlan(lp, pc),pc);  
-            fail("Must throw MRCompiler Exception");
+            Assert.fail("Must throw MRCompiler Exception");
         } catch (Exception e) {
-            assertTrue(e instanceof MRCompilerException);
+            Assert.assertTrue(e instanceof MRCompilerException);
         }
     }
 
@@ -107,7 +106,7 @@ public class TestCollectedGroup extends 
         LogicalPlanTester lpt = new LogicalPlanTester();
         lpt.buildPlan("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
         LogicalPlan lp = lpt.buildPlan("B = group A by id using 'collected';");
-        assertEquals(LOCogroup.GROUPTYPE.COLLECTED, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
+        Assert.assertEquals(LOCogroup.GROUPTYPE.COLLECTED, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
     }
     
     @Test
@@ -116,7 +115,7 @@ public class TestCollectedGroup extends 
         LogicalPlanTester lpt = new LogicalPlanTester();
         lpt.buildPlan("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
         LogicalPlan lp = lpt.buildPlan("B = group A all using 'regular';");
-        assertEquals(LOCogroup.GROUPTYPE.REGULAR, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
+        Assert.assertEquals(LOCogroup.GROUPTYPE.REGULAR, ((LOCogroup)lp.getLeaves().get(0)).getGroupType());
     }
     
     @AfterClass
@@ -141,11 +140,12 @@ public class TestCollectedGroup extends 
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
     
         try {
-            pigServer.registerQuery("C = group A by id, B by id using \"collected\";");
-            fail("Pig doesn't support multi-input collected group.");
+            pigServer.registerQuery("C = group A by id, B by id using 'collected';");
+            pigServer.openIterator( "C" );
+            Assert.fail("Pig doesn't support multi-input collected group.");
         } catch (Exception e) {
-             Assert.assertEquals(e.getMessage(), 
-                "Error during parsing. Collected group is only supported for single input");
+            String msg = "Pig script failed to validate: Collected group is only supported for single input";
+            Assert.assertTrue( e.getMessage().contains( msg ) );
         }
     }
     
@@ -156,11 +156,12 @@ public class TestCollectedGroup extends 
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
     
         try {
-            pigServer.registerQuery("B = group A all using \"collected\";");
-            fail("Pig doesn't support collected group all.");
+            pigServer.registerQuery("B = group A all using 'collected';");
+            pigServer.openIterator( "B" );
+            Assert.fail("Pig doesn't support collected group all.");
         } catch (Exception e) {
-            Assert.assertEquals(e.getMessage(), 
-                "Error during parsing. Collected group is only supported for columns or star projection");
+            String msg = "Pig script failed to validate: Collected group is only supported for columns or star projection";
+            Assert.assertTrue( e.getMessage().contains( msg ) );
         }
     }
      
@@ -171,11 +172,12 @@ public class TestCollectedGroup extends 
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
     
         try {
-            pigServer.registerQuery("B = group A by id*grade using \"collected\";");
-            fail("Pig doesn't support collected group by expression.");
+            pigServer.registerQuery("B = group A by id*grade using 'collected';");
+            pigServer.openIterator("B");
+            Assert.fail("Pig doesn't support collected group by expression.");
         } catch (Exception e) {
-            Assert.assertEquals(e.getMessage(), 
-                "Error during parsing. Collected group is only supported for columns or star projection");
+            String msg = "Pig script failed to validate: Collected group is only supported for columns or star projection";
+            Assert.assertTrue( e.getMessage().contains( msg ) );
         }
     }
 
@@ -188,7 +190,7 @@ public class TestCollectedGroup extends 
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             DataBag dbshj = BagFactory.getInstance().newDefaultBag();
             {
-                pigServer.registerQuery("B = group A by id using \"collected\";");
+                pigServer.registerQuery("B = group A by id using 'collected';");
                 pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
                 Iterator<Tuple> iter = pigServer.openIterator("C");
 
@@ -210,7 +212,7 @@ public class TestCollectedGroup extends 
 
         } catch (Exception e) {
             e.printStackTrace();
-            fail(e.getMessage());
+            Assert.fail(e.getMessage());
         }
     }
  
@@ -224,7 +226,7 @@ public class TestCollectedGroup extends 
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             DataBag dbshj = BagFactory.getInstance().newDefaultBag();
             {
-                pigServer.registerQuery("B = group A by (id, name) using \"collected\";");
+                pigServer.registerQuery("B = group A by (id, name) using 'collected';");
                 pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
                 Iterator<Tuple> iter = pigServer.openIterator("C");
 
@@ -246,7 +248,7 @@ public class TestCollectedGroup extends 
 
         } catch (Exception e) {
             e.printStackTrace();
-            fail(e.getMessage());
+            Assert.fail(e.getMessage());
         }
     }
   
@@ -260,7 +262,7 @@ public class TestCollectedGroup extends 
             DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
             DataBag dbshj = BagFactory.getInstance().newDefaultBag();
             {
-                pigServer.registerQuery("B = group A by * using \"collected\";");
+                pigServer.registerQuery("B = group A by * using 'collected';");
                 pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
                 Iterator<Tuple> iter = pigServer.openIterator("C");
 
@@ -282,7 +284,7 @@ public class TestCollectedGroup extends 
 
         } catch (Exception e) {
             e.printStackTrace();
-            fail(e.getMessage());
+            Assert.fail(e.getMessage());
         }
     }
 

Modified: pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestDataBagAccess.java Fri Mar  4 18:15:11 2011
@@ -36,6 +36,7 @@ import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.MultiMap;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -124,11 +125,12 @@ public class TestDataBagAccess extends T
         boolean exceptionOccured = false;
         try {
             pigServer.registerQuery("c = foreach b generate mybag.t;");
-        } catch(IOException e) {
+            pigServer.explain("c", System.out);
+        } catch(FrontendException e) {
             exceptionOccured = true;
-            String msg = e.getMessage();
-            assertTrue(msg.contains("Only access to the elements of " +
-                    "the tuple in the bag is allowed."));
+            Throwable cause = e.getCause();
+            String msg = cause.getMessage();
+            Util.checkStrContainsSubStr(msg, "Invalid field reference. Referenced field [t] does not exist in schema");
         }
         assertTrue(exceptionOccured);
     }
@@ -141,7 +143,7 @@ public class TestDataBagAccess extends T
                 + Util.generateURI(Util.encodeEscape(input.toString()), pigServer.getPigContext()) + "';");
         pigServer.registerQuery("B = foreach A generate {(('p1-t1-e1', 'p1-t1-e2'),('p1-t2-e1', 'p1-t2-e2'))," +
                 "(('p2-t1-e1', 'p2-t1-e2'), ('p2-t2-e1', 'p2-t2-e2'))};");
-        pigServer.registerQuery("C = foreach B generate $0 as pairbag { pair: ( t1: (e1, e2), t2: (e1, e2) ) };");
+        pigServer.registerQuery("C = foreach B generate $0 as pairbag : { pair: ( t1: (e1, e2), t2: (e1, e2) ) };");
         pigServer.registerQuery("D = foreach C generate FLATTEN(pairbag);");
         pigServer.registerQuery("E = foreach D generate t1.e2 as t1e2, t2.e1 as t2e1;");
         Iterator<Tuple> it = pigServer.openIterator("E");

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java?rev=1078085&r1=1078084&r2=1078085&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline.java Fri Mar  4 18:15:11 2011
@@ -31,7 +31,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.StringTokenizer;
 
-import junit.framework.TestCase;
+import junit.framework.Assert;
 
 import org.apache.pig.ComparisonFunc;
 import org.apache.pig.EvalFunc;
@@ -63,7 +63,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-public class TestEvalPipeline extends TestCase {
+public class TestEvalPipeline {
     
     static MiniCluster cluster = MiniCluster.buildCluster();
     private PigServer pigServer;
@@ -73,7 +73,6 @@ public class TestEvalPipeline extends Te
     BagFactory mBf = BagFactory.getInstance();
     
     @Before
-    @Override
     public void setUp() throws Exception{
         FileLocalizer.setR(new Random());
         pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
@@ -109,7 +108,7 @@ public class TestEvalPipeline extends Te
         Iterator<Tuple> iter  = pigServer.openIterator("b");
         
         for (int i=0 ;i<3; i++){
-            assertEquals(DataType.toDouble(iter.next().get(0)), 0.0);
+            Assert.assertEquals(DataType.toDouble(iter.next().get(0)), 0.0);
         }
     }
     
@@ -130,10 +129,10 @@ public class TestEvalPipeline extends Te
         int count = 0;
         while(iter.hasNext()){
             Tuple t = iter.next();
-            assertTrue(t.get(0).toString().equals(t.get(2).toString()));
+            Assert.assertTrue(t.get(0).toString().equals(t.get(2).toString()));
             count++;
         }
-        assertEquals(count, 4);
+        Assert.assertEquals(count, 4);
     }
     
     @Test
@@ -150,11 +149,11 @@ public class TestEvalPipeline extends Te
         int count = 0;
         while(iter.hasNext()){
             Tuple t = iter.next();
-            assertTrue(t.get(0).toString().equals("1"));
-            assertTrue(t.get(1).toString().equals("a"));
+            Assert.assertTrue(t.get(0).toString().equals("1"));
+            Assert.assertTrue(t.get(1).toString().equals("a"));
             count++;
         }
-        assertEquals(count, 6);
+        Assert.assertEquals(count, 6);
         f.delete();
     }
     
@@ -185,9 +184,9 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("b = foreach a generate $0#'apple',flatten($1#'orange');");
         Iterator<Tuple> iter = pigServer.openIterator("b");
         t = iter.next();
-        assertEquals(t.get(0).toString(), "red");
-        assertEquals(DataType.toDouble(t.get(1)), 0.3);
-        assertFalse(iter.hasNext());
+        Assert.assertEquals(t.get(0).toString(), "red");
+        Assert.assertEquals(DataType.toDouble(t.get(1)), 0.3);
+        Assert.assertFalse(iter.hasNext());
         Util.deleteFile(cluster, fileName);
     }
     
@@ -339,10 +338,10 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("answer = FOREACH cogrouped GENERATE COUNT(queryLog),group;");
         
         Iterator<Tuple> iter = pigServer.openIterator("answer");
-        if(!iter.hasNext()) fail("No Output received");
+        if(!iter.hasNext()) Assert.fail("No Output received");
         while(iter.hasNext()){
             Tuple t = iter.next();
-            assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());
+            Assert.assertEquals(expectedResults.get(t.get(1).toString()).doubleValue(),(DataType.toDouble(t.get(0))).doubleValue());
         }
     }
     
@@ -395,16 +394,16 @@ public class TestEvalPipeline extends Te
         Iterator<Tuple> iter = pigServer.openIterator("B");
         String last = "";
         HashSet<Integer> seen = new HashSet<Integer>();
-        if(!iter.hasNext()) fail("No Results obtained");
+        if(!iter.hasNext()) Assert.fail("No Results obtained");
         while (iter.hasNext()){
             Tuple t = iter.next();
             if (eliminateDuplicates){
                 Integer act = Integer.parseInt(t.get(0).toString());
-                assertFalse(seen.contains(act));
+                Assert.assertFalse(seen.contains(act));
                 seen.add(act);
             }else{
-                assertTrue(last.compareTo(t.get(0).toString())<=0);
-                assertEquals(t.size(), 2);
+                Assert.assertTrue(last.compareTo(t.get(0).toString())<=0);
+                Assert.assertEquals(t.size(), 2);
                 last = t.get(0).toString();
             }
         }        
@@ -435,19 +434,19 @@ public class TestEvalPipeline extends Te
 
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
         int numIdentity = 0;
         while(iter.hasNext()){
             Tuple t = iter.next();
-            assertEquals((Integer)numIdentity, (Integer)t.get(0));
-            assertEquals((Long)5L, (Long)t.get(2));
-            assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
-            assertEquals(8.0, (Double)t.get(5), 0.01);
-            assertEquals(5L, ((DataBag)t.get(6)).size());
-            assertEquals(7, t.size());
+            Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
+            Assert.assertEquals((Long)5L, (Long)t.get(2));
+            Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
+            Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
+            Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
+            Assert.assertEquals(7, t.size());
             ++numIdentity;
         }
-        assertEquals(LOOP_COUNT, numIdentity);
+        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test
@@ -480,19 +479,19 @@ public class TestEvalPipeline extends Te
 
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
         int numIdentity = 0;
         while(iter.hasNext()){
             Tuple t = iter.next();
-            assertEquals((Integer)numIdentity, (Integer)t.get(0));
-            assertEquals((Long)5L, (Long)t.get(2));
-            assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
-            assertEquals(8.0, (Double)t.get(5), 0.01);
-            assertEquals(5L, ((DataBag)t.get(6)).size());
-            assertEquals(7, t.size());
+            Assert.assertEquals((Integer)numIdentity, (Integer)t.get(0));
+            Assert.assertEquals((Long)5L, (Long)t.get(2));
+            Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
+            Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
+            Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
+            Assert.assertEquals(7, t.size());
             ++numIdentity;
         }
-        assertEquals(LOOP_COUNT, numIdentity);
+        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test
@@ -509,13 +508,13 @@ public class TestEvalPipeline extends Te
                 + Util.generateURI(tmpFile.toString(), pigContext) + "';");
         pigServer.registerQuery("B = limit A 5;");
         Iterator<Tuple> iter = pigServer.openIterator("B");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
         int numIdentity = 0;
         while(iter.hasNext()){
             iter.next();
             ++numIdentity;
         }
-        assertEquals(5, numIdentity);
+        Assert.assertEquals(5, numIdentity);
     }
     
     @Test
@@ -530,11 +529,11 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
         Iterator<Tuple> it = pigServer.openIterator("b");
         Tuple t = it.next();
-        assertEquals(new Long(2), t.get(0));
-        assertEquals("1", t.get(1).toString());
-        assertEquals("2", t.get(2).toString());
-        assertEquals("value1", t.get(3).toString());
-        assertEquals("value2", t.get(4).toString());
+        Assert.assertEquals(new Long(2), t.get(0));
+        Assert.assertEquals("1", t.get(1).toString());
+        Assert.assertEquals("2", t.get(2).toString());
+        Assert.assertEquals("value1", t.get(3).toString());
+        Assert.assertEquals("value2", t.get(4).toString());
         
         //test with BinStorage
         pigServer.registerQuery("a = load '" 
@@ -548,11 +547,11 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("y = foreach x generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
         it = pigServer.openIterator("y");
         t = it.next();
-        assertEquals(new Long(2), t.get(0));
-        assertEquals("1", t.get(1).toString());
-        assertEquals("2", t.get(2).toString());
-        assertEquals("value1", t.get(3).toString());
-        assertEquals("value2", t.get(4).toString());        
+        Assert.assertEquals(new Long(2), t.get(0));
+        Assert.assertEquals("1", t.get(1).toString());
+        Assert.assertEquals("2", t.get(2).toString());
+        Assert.assertEquals("value1", t.get(3).toString());
+        Assert.assertEquals("value2", t.get(4).toString());        
     }
 
     @Test
@@ -567,11 +566,11 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("b = foreach a generate COUNT(b), t2.a, t2.b, m#'key1', m#'key2';");
         Iterator<Tuple> it = pigServer.openIterator("b");
         Tuple t = it.next();
-        assertEquals(new Long(2), t.get(0));
-        assertEquals(1, t.get(1));
-        assertEquals(2, t.get(2));
-        assertEquals("value1", t.get(3).toString());
-        assertEquals("value2", t.get(4).toString());
+        Assert.assertEquals(new Long(2), t.get(0));
+        Assert.assertEquals(1, t.get(1));
+        Assert.assertEquals(2, t.get(2));
+        Assert.assertEquals("value1", t.get(3).toString());
+        Assert.assertEquals("value2", t.get(4).toString());
         
         //test with BinStorage
         pigServer.registerQuery("a = load '" 
@@ -596,19 +595,19 @@ public class TestEvalPipeline extends Te
             pigServer.registerQuery(generates[i]);
             it = pigServer.openIterator("q");
             t = it.next();
-            assertEquals(new Long(2), t.get(0));
-            assertEquals(Integer.class, t.get(1).getClass());
-            assertEquals(1, t.get(1));
-            assertEquals(Integer.class, t.get(2).getClass());
-            assertEquals(2, t.get(2));
-            assertEquals("value1", t.get(3).toString());
-            assertEquals("value2", t.get(4).toString());
-            assertEquals(DefaultDataBag.class, t.get(5).getClass());
+            Assert.assertEquals(new Long(2), t.get(0));
+            Assert.assertEquals(Integer.class, t.get(1).getClass());
+            Assert.assertEquals(1, t.get(1));
+            Assert.assertEquals(Integer.class, t.get(2).getClass());
+            Assert.assertEquals(2, t.get(2));
+            Assert.assertEquals("value1", t.get(3).toString());
+            Assert.assertEquals("value2", t.get(4).toString());
+            Assert.assertEquals(DefaultDataBag.class, t.get(5).getClass());
             DataBag bg = (DataBag)t.get(5);
             for (Iterator<Tuple> bit = bg.iterator(); bit.hasNext();) {
                 Tuple bt = bit.next();
-                assertEquals(String.class, bt.get(0).getClass());
-                assertEquals(String.class, bt.get(1).getClass());            
+                Assert.assertEquals(String.class, bt.get(0).getClass());
+                Assert.assertEquals(String.class, bt.get(1).getClass());            
             }
         }        
     }
@@ -625,9 +624,9 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("c = foreach b generate flatten(a.(y, z));");
         Iterator<Tuple> it = pigServer.openIterator("c");
         Tuple t = it.next();
-        assertEquals(2, t.size());
-        assertEquals("f2", t.get(0).toString());
-        assertEquals("f3", t.get(1).toString());
+        Assert.assertEquals(2, t.size());
+        Assert.assertEquals("f2", t.get(0).toString());
+        Assert.assertEquals("f3", t.get(1).toString());
     }
 
     @Test
@@ -658,12 +657,12 @@ public class TestEvalPipeline extends Te
             pigServer.registerQuery(generates[i]);
             Iterator<Tuple> it = pigServer.openIterator("q");
             Tuple t = it.next();
-            assertEquals("pigtester", t.get(0));
-            assertEquals(String.class, t.get(0).getClass());
-            assertEquals(10, t.get(1));
-            assertEquals(Integer.class, t.get(1).getClass());
-            assertEquals(1.2, t.get(2));
-            assertEquals(Double.class, t.get(2).getClass());
+            Assert.assertEquals("pigtester", t.get(0));
+            Assert.assertEquals(String.class, t.get(0).getClass());
+            Assert.assertEquals(10, t.get(1));
+            Assert.assertEquals(Integer.class, t.get(1).getClass());
+            Assert.assertEquals(1.2, t.get(2));
+            Assert.assertEquals(Double.class, t.get(2).getClass());
         }
         
         // test that valid casting is allowed
@@ -672,12 +671,12 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("q = foreach p generate name, age, gpa;");
         Iterator<Tuple> it = pigServer.openIterator("q");
         Tuple t = it.next();
-        assertEquals("pigtester", t.get(0));
-        assertEquals(String.class, t.get(0).getClass());
-        assertEquals(10L, t.get(1));
-        assertEquals(Long.class, t.get(1).getClass());
-        assertEquals(1.2f, t.get(2));
-        assertEquals(Float.class, t.get(2).getClass());
+        Assert.assertEquals("pigtester", t.get(0));
+        Assert.assertEquals(String.class, t.get(0).getClass());
+        Assert.assertEquals(10L, t.get(1));
+        Assert.assertEquals(Long.class, t.get(1).getClass());
+        Assert.assertEquals(1.2f, t.get(2));
+        Assert.assertEquals(Float.class, t.get(2).getClass());
         
         // test that implicit casts work
         pigServer.registerQuery("p = load '" + output + "' using BinStorage() " +
@@ -685,12 +684,12 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("q = foreach p generate name, age + 1L, (int)gpa;");
         it = pigServer.openIterator("q");
         t = it.next();
-        assertEquals("pigtester", t.get(0));
-        assertEquals(String.class, t.get(0).getClass());
-        assertEquals(11L, t.get(1));
-        assertEquals(Long.class, t.get(1).getClass());
-        assertEquals(1, t.get(2));
-        assertEquals(Integer.class, t.get(2).getClass());
+        Assert.assertEquals("pigtester", t.get(0));
+        Assert.assertEquals(String.class, t.get(0).getClass());
+        Assert.assertEquals(11L, t.get(1));
+        Assert.assertEquals(Long.class, t.get(1).getClass());
+        Assert.assertEquals(1, t.get(2));
+        Assert.assertEquals(Integer.class, t.get(2).getClass());
     }
     
     @Test
@@ -721,10 +720,10 @@ public class TestEvalPipeline extends Te
         Iterator<Tuple> it = pigServer.openIterator("e");
         for(int i = 0; i < resultMap.size(); i++) {
             Tuple t = it.next();
-            assertEquals(true, resultMap.containsKey(t.get(0)));
+            Assert.assertEquals(true, resultMap.containsKey(t.get(0)));
             Pair<Long, Long> output = resultMap.get(t.get(0)); 
-            assertEquals(output.first, t.get(1));
-            assertEquals(output.second, t.get(2));
+            Assert.assertEquals(output.first, t.get(1));
+            Assert.assertEquals(output.second, t.get(2));
         }
     }
     
@@ -739,7 +738,7 @@ public class TestEvalPipeline extends Te
                 + "' using PigStorage() " + "as (name:chararray);");
         Iterator<Tuple> it = pigServer.openIterator("a");
         Tuple t = it.next();
-        assertEquals("wendyξ", t.get(0));
+        Assert.assertEquals("wendyξ", t.get(0));
         
     }
 
@@ -765,26 +764,26 @@ public class TestEvalPipeline extends Te
 
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
         int numIdentity = 0;
         while(iter.hasNext()){
             Tuple t = iter.next();
-            assertEquals(1.0, (Double)t.get(0), 0.01);
-            assertEquals(2.0, (Float)t.get(1), 0.01);
-            assertTrue(((String)t.get(2)).equals("Hello World!"));
-            assertEquals(new Integer(10), (Integer)t.get(3));
-            assertEquals(1, ((DataBag)t.get(4)).size());
-            assertEquals(4, ((Tuple)t.get(5)).size());
-            assertEquals(2, ((Map<String, Object>)t.get(6)).size());
-            assertEquals(DataByteArray.class, t.get(7).getClass());
-            assertEquals(8, t.size());
+            Assert.assertEquals(1.0, (Double)t.get(0), 0.01);
+            Assert.assertEquals(2.0, (Float)t.get(1), 0.01);
+            Assert.assertTrue(((String)t.get(2)).equals("Hello World!"));
+            Assert.assertEquals(new Integer(10), (Integer)t.get(3));
+            Assert.assertEquals(1, ((DataBag)t.get(4)).size());
+            Assert.assertEquals(4, ((Tuple)t.get(5)).size());
+            Assert.assertEquals(2, ((Map<String, Object>)t.get(6)).size());
+            Assert.assertEquals(DataByteArray.class, t.get(7).getClass());
+            Assert.assertEquals(8, t.size());
             ++numIdentity;
         }
-        assertEquals(LOOP_COUNT * LOOP_COUNT, numIdentity);
+        Assert.assertEquals(LOOP_COUNT * LOOP_COUNT, numIdentity);
     }
 
     @Test
-    public void testMapUDFFail() throws Exception{
+    public void testMapUDFfail() throws Exception{
         int LOOP_COUNT = 2;
         File tmpFile = Util.createTempFileDelOnExit("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
@@ -806,7 +805,7 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery(query);
         try {
             pigServer.openIterator("C");
-            fail("Error expected.");
+            Assert.fail("Error expected.");
         } catch (Exception e) {
             e.getMessage().contains("Cannot determine");
         }
@@ -824,8 +823,8 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("b = foreach a generate (chararray)$0, (chararray)$1;");
         Iterator<Tuple> it = pigServer.openIterator("b");
         Tuple t = it.next();
-        assertEquals("hello", t.get(0));
-        assertEquals("world", t.get(1));
+        Assert.assertEquals("hello", t.get(0));
+        Assert.assertEquals("world", t.get(1));
         
     }
 
@@ -855,20 +854,20 @@ public class TestEvalPipeline extends Te
 
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
         int numIdentity = 0;
         while(iter.hasNext()){
             Tuple t = iter.next();
-            assertEquals((Integer)((numIdentity + 1) * 10), (Integer)t.get(0));
-            assertEquals((Long)10L, (Long)t.get(1));
-            assertEquals((Long)5L, (Long)t.get(2));
-            assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
-            assertEquals(8.0, (Double)t.get(5), 0.01);
-            assertEquals(5L, ((DataBag)t.get(6)).size());
-            assertEquals(7, t.size());
+            Assert.assertEquals((Integer)((numIdentity + 1) * 10), (Integer)t.get(0));
+            Assert.assertEquals((Long)10L, (Long)t.get(1));
+            Assert.assertEquals((Long)5L, (Long)t.get(2));
+            Assert.assertEquals(LOOP_COUNT*2.0, (Double)t.get(3), 0.01);
+            Assert.assertEquals(8.0, (Double)t.get(5), 0.01);
+            Assert.assertEquals(5L, ((DataBag)t.get(6)).size());
+            Assert.assertEquals(7, t.size());
             ++numIdentity;
         }
-        assertEquals(LOOP_COUNT, numIdentity);
+        Assert.assertEquals(LOOP_COUNT, numIdentity);
     }
 
     @Test
@@ -896,26 +895,26 @@ public class TestEvalPipeline extends Te
 
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
 
         int numRows = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){
                 Tuple t = null;
                 if(iter.hasNext()) t = iter.next();
-                assertEquals(3, t.size());
-                assertEquals(new Double(j - i), (Double)t.get(0), 0.01);
-                assertEquals((Integer)(j%2), (Integer)t.get(1));
+                Assert.assertEquals(3, t.size());
+                Assert.assertEquals(new Double(j - i), (Double)t.get(0), 0.01);
+                Assert.assertEquals((Integer)(j%2), (Integer)t.get(1));
                 if(j == 0) {
-                    assertEquals(0.0, (Double)t.get(2), 0.01);
+                    Assert.assertEquals(0.0, (Double)t.get(2), 0.01);
                 } else {
-                    assertEquals((Double)((double)i/j), (Double)t.get(2), 0.01);
+                    Assert.assertEquals((Double)((double)i/j), (Double)t.get(2), 0.01);
                 }
                 ++numRows;
             }
         }
 
-        assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
 
     @Test
@@ -942,21 +941,21 @@ public class TestEvalPipeline extends Te
 
         pigServer.registerQuery(query);
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
 
         int numRows = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){
                 Tuple t = null;
                 if(iter.hasNext()) t = iter.next();
-                assertEquals(2, t.size());
-                assertEquals(new Double(i + j), (Double)t.get(0), 0.01);
-                assertEquals(new Double(i + j + i), (Double)t.get(1));
+                Assert.assertEquals(2, t.size());
+                Assert.assertEquals(new Double(i + j), (Double)t.get(0), 0.01);
+                Assert.assertEquals(new Double(i + j + i), (Double)t.get(1));
                 ++numRows;
             }
         }
 
-        assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
 
     @Test
@@ -978,20 +977,20 @@ public class TestEvalPipeline extends Te
         pigServer.registerQuery("C = foreach B generate FLATTEN(" + Identity.class.getName() + "($0, $1));"); //the argument does not matter
 
         Iterator<Tuple> iter = pigServer.openIterator("C");
-        if(!iter.hasNext()) fail("No output found");
+        if(!iter.hasNext()) Assert.fail("No output found");
         int numRows = 0;
         for(int i = 0; i < LOOP_COUNT; i++) {
             for(int j = 0; j < LOOP_COUNT; j+=2){
                 Tuple t = null;
                 if(iter.hasNext()) t = iter.next();
-                assertEquals(2, t.size());
-                assertEquals(new Double(i), new Double(t.get(0).toString()), 0.01);
-                assertEquals(new Double(j), new Double(t.get(1).toString()), 0.01);
+                Assert.assertEquals(2, t.size());
+                Assert.assertEquals(new Double(i), new Double(t.get(0).toString()), 0.01);
+                Assert.assertEquals(new Double(j), new Double(t.get(1).toString()), 0.01);
                 ++numRows;
             }
         }
 
-        assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
+        Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, numRows);
     }
     
     @Test
@@ -1044,7 +1043,7 @@ public class TestEvalPipeline extends Te
             Tuple expected = results.get((String)fields.get(0));
             int i = 0;
             for (Object field : fields) {
-                assertEquals(expected.get(i++), field);
+                Assert.assertEquals(expected.get(i++), field);
             }
         }
         
@@ -1087,7 +1086,7 @@ public class TestEvalPipeline extends Te
         while(it.hasNext()) {
             Tuple tup = it.next();
             Long resultBagSize = (Long)tup.get(0);
-            assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
+            Assert.assertTrue(DataType.compare(expectedBag.size(), resultBagSize) == 0);
         }
         
         Util.deleteFile(cluster, "table");        
@@ -1128,7 +1127,7 @@ public class TestEvalPipeline extends Te
             Tuple tup = it.next();
             String resultString = (String)tup.get(0);
             String expectedString = stringArray[counter];
-          	assertTrue(expectedString.equals(resultString));
+          	Assert.assertTrue(expectedString.equals(resultString));
             ++counter;
         }
         Util.deleteFile(cluster, "table");