You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC

svn commit: r1642132 [8/14] - in /pig/branches/spark: ./ bin/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/ contrib/piggybank/java/sr...

Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java Thu Nov 27 12:49:54 2014
@@ -26,6 +26,7 @@ import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.PigContext;
@@ -48,6 +49,7 @@ import org.apache.pig.newplan.logical.op
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.newplan.optimizer.Transformer;
+import org.joda.time.DateTimeZone;
 
 public abstract class ConstantCalculator extends Rule {
     private List<LogicalRelationalOperator> processedOperators = new ArrayList<LogicalRelationalOperator>();
@@ -107,6 +109,7 @@ public abstract class ConstantCalculator
         public static class ConstantCalculatorExpressionVisitor extends AllSameExpressionVisitor {
             private LogicalRelationalOperator currentOp;
             private PigContext pc;
+            private DateTimeZone currentDTZ = null;
             public ConstantCalculatorExpressionVisitor(OperatorPlan expPlan,
                     LogicalRelationalOperator currentOp, PigContext pc) throws FrontendException {
                 super(expPlan, new ReverseDependencyOrderWalkerWOSeenChk(expPlan));
@@ -148,7 +151,11 @@ public abstract class ConstantCalculator
                     PhysicalOperator root = expPhysicalPlan.getLeaves().get(0);
                     try {
                         UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true));
+                        PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+                        PhysicalOperator.setPigLogger(pigHadoopLogger);
+                        setDefaultTimeZone();
                         val = root.getNext(root.getResultType()).result;
+                        restoreDefaultTimeZone();
                         UDFContext.getUDFContext().addJobConf(null);
                     } catch (ExecException e) {
                         throw new FrontendException(e);
@@ -159,7 +166,9 @@ public abstract class ConstantCalculator
                     UserFuncExpression udf = (UserFuncExpression)op;
                     try {
                         UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true));
+                        setDefaultTimeZone();
                         val = udf.getEvalFunc().exec(null);
+                        restoreDefaultTimeZone();
                         UDFContext.getUDFContext().addJobConf(null);
                     } catch (IOException e) {
                         throw new FrontendException(e);
@@ -173,6 +182,21 @@ public abstract class ConstantCalculator
                     currentWalker.getPlan().replace(op, constantExpr);
                 }
             }
+
+            private void setDefaultTimeZone() {
+                String dtzStr = pc.getProperties().getProperty("pig.datetime.default.tz");
+                if (dtzStr != null && dtzStr.length() > 0) {
+                    currentDTZ = DateTimeZone.getDefault();
+                    DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
+                }
+            }
+
+            private void restoreDefaultTimeZone() {
+                if (currentDTZ != null) {
+                    DateTimeZone.setDefault(currentDTZ);
+                    currentDTZ = null;
+                }
+            }
         }
 
         @Override

Modified: pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g Thu Nov 27 12:49:54 2014
@@ -227,7 +227,7 @@ bag_type
     : ^( BAG_TYPE IDENTIFIER? tuple_type? )
 ;
 
-map_type : ^( MAP_TYPE type? )
+map_type : ^( MAP_TYPE IDENTIFIER? type? )
 ;
 
 func_clause

Modified: pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g Thu Nov 27 12:49:54 2014
@@ -222,7 +222,7 @@ bag_type
     : ^( BAG_TYPE { sb.append("bag{"); } ( { sb.append("T:"); } IDENTIFIER? tuple_type )? ) { sb.append("}"); }
 ;
 
-map_type : ^( MAP_TYPE { sb.append("map["); } type? ) { sb.append("]"); }
+map_type : ^( MAP_TYPE { sb.append("map["); } IDENTIFIER? type? ) { sb.append("]"); }
 ;
 
 func_clause

Modified: pig/branches/spark/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstValidator.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/AstValidator.g Thu Nov 27 12:49:54 2014
@@ -276,7 +276,7 @@ tuple_type : ^( TUPLE_TYPE field_def_lis
 bag_type : ^( BAG_TYPE IDENTIFIER? tuple_type? )
 ;
 
-map_type : ^( MAP_TYPE type? )
+map_type : ^( MAP_TYPE IDENTIFIER? type? )
 ;
 
 func_clause : ^( FUNC_REF func_name )

Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Nov 27 12:49:54 2014
@@ -100,6 +100,8 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
 import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
 import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor;
+import org.apache.pig.validator.BlackAndWhitelistFilter;
+import org.apache.pig.validator.PigCommandFilter;
 
 public class LogicalPlanBuilder {
 
@@ -123,6 +125,8 @@ public class LogicalPlanBuilder {
     private int storeIndex = 0;
     private int loadIndex = 0;
 
+    private final BlackAndWhitelistFilter filter;
+
     private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
 
     public static long getNextId(String scope) {
@@ -135,6 +139,7 @@ public class LogicalPlanBuilder {
         this.scope = scope;
         this.fileNameMap = fileNameMap;
         this.intStream = input;
+        this.filter = new BlackAndWhitelistFilter(this.pigContext);
     }
 
     LogicalPlanBuilder(IntStream input) throws ExecException {
@@ -143,6 +148,7 @@ public class LogicalPlanBuilder {
         this.scope = "test";
         this.fileNameMap = new HashMap<String, String>();
         this.intStream = input;
+        this.filter = new BlackAndWhitelistFilter(this.pigContext);
     }
 
     Operator lookupOperator(String alias) {
@@ -158,10 +164,20 @@ public class LogicalPlanBuilder {
     }
 
     void defineCommand(String alias, StreamingCommand command) {
+        try {
+            filter.validate(PigCommandFilter.Command.DEFINE);
+        } catch (FrontendException e) {
+            throw new RuntimeException(e.getMessage());
+        }
         pigContext.registerStreamCmd( alias, command );
     }
 
     void defineFunction(String alias, FuncSpec fs) {
+        try {
+            filter.validate(PigCommandFilter.Command.DEFINE);
+        } catch (FrontendException e) {
+            throw new RuntimeException(e);
+        }
         pigContext.registerFunction( alias, fs );
     }
 

Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Nov 27 12:49:54 2014
@@ -430,24 +430,19 @@ tuple_type returns[LogicalSchema logical
 bag_type returns[LogicalSchema logicalSchema]
  : ^( BAG_TYPE IDENTIFIER? tuple_type? )
    {
-       if ($tuple_type.logicalSchema!=null && $tuple_type.logicalSchema.size()==1 && $tuple_type.logicalSchema.getField(0).type==DataType.TUPLE) {
-           $logicalSchema = $tuple_type.logicalSchema;
-       }
-       else {
-           LogicalSchema s = new LogicalSchema();
-           s.addField(new LogicalFieldSchema($IDENTIFIER.text, $tuple_type.logicalSchema, DataType.TUPLE));
-           $logicalSchema = s;
-       }
+       LogicalSchema s = new LogicalSchema();
+       s.addField(new LogicalFieldSchema($IDENTIFIER.text, $tuple_type.logicalSchema, DataType.TUPLE));
+       $logicalSchema = s;
    }
 ;
 
 map_type returns[LogicalSchema logicalSchema]
- : ^( MAP_TYPE type? )
+ : ^( MAP_TYPE IDENTIFIER? type? )
    {
        LogicalSchema s = null;
        if( $type.datatype != null ) {
            s = new LogicalSchema();
-           s.addField( new LogicalFieldSchema( null, $type.logicalSchema, $type.datatype ) );
+           s.addField( new LogicalFieldSchema( $IDENTIFIER.text, $type.logicalSchema, $type.datatype ) );
        }
        $logicalSchema = s;
    }

Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Thu Nov 27 12:49:54 2014
@@ -329,7 +329,7 @@ explicit_bag_type : BAG! implicit_bag_ty
 explicit_bag_type_cast : BAG LEFT_CURLY explicit_tuple_type_cast? RIGHT_CURLY -> ^( BAG_TYPE_CAST explicit_tuple_type_cast? )
 ;
 
-implicit_map_type : LEFT_BRACKET type? RIGHT_BRACKET -> ^( MAP_TYPE type? )
+implicit_map_type : LEFT_BRACKET ( ( identifier_plus COLON )? type )? RIGHT_BRACKET -> ^( MAP_TYPE identifier_plus? type? )
 ;
 
 explicit_map_type : MAP! implicit_map_type

Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java Thu Nov 27 12:49:54 2014
@@ -47,12 +47,15 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
 import org.apache.pig.impl.io.ResourceNotFoundException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.parser.QueryParser.literal_return;
 import org.apache.pig.parser.QueryParser.schema_return;
 import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.validator.BlackAndWhitelistFilter;
+import org.apache.pig.validator.PigCommandFilter;
 
 public class QueryParserDriver {
     private static final Log LOG = LogFactory.getLog(QueryParserDriver.class);
@@ -367,8 +370,17 @@ public class QueryParserDriver {
     private boolean expandImport(Tree ast) throws ParserException {
         List<CommonTree> nodes = new ArrayList<CommonTree>();
         traverseImport(ast, nodes);
-        if (nodes.isEmpty()) return false;
+        if (nodes.isEmpty())
+            return false;
 
+        // Validate if imports are enabled/disabled
+        final BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(
+                this.pigContext);
+        try {
+            filter.validate(PigCommandFilter.Command.IMPORT);
+        } catch (FrontendException e) {
+            throw new ParserException(e.getMessage());
+        }
         for (CommonTree t : nodes) {
             macroImport(t);
         }
@@ -562,6 +574,7 @@ public class QueryParserDriver {
 
             String macroText = null;
             try {
+                in.close();
                 in = new BufferedReader(new StringReader(sb.toString()));
                 macroText = pigContext.doParamSubstitution(in);
             } catch (IOException e) {

Modified: pig/branches/spark/src/org/apache/pig/scripting/Pig.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/Pig.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/Pig.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/Pig.java Thu Nov 27 12:49:54 2014
@@ -120,7 +120,7 @@ public class Pig {
      */
     public static void registerUDF(String udffile, String namespace)
             throws IOException {
-        LOG.info("Register script UFD file: "+ udffile);
+        LOG.info("Register script UDF file: "+ udffile);
         ScriptPigContext ctx = getScriptContext();
         ScriptEngine engine = ctx.getScriptEngine();
         // script file contains only functions, no need to separate
@@ -349,13 +349,16 @@ public class Pig {
     private static String getScriptFromFile(String filename) throws IOException {
         LineNumberReader rd = new LineNumberReader(new FileReader(filename));
         StringBuilder sb = new StringBuilder();
-        String line = rd.readLine();
-        while (line != null) {
-            sb.append(line);
-            sb.append("\n");
-            line = rd.readLine();
+        try {
+            String line = rd.readLine();
+            while (line != null) {
+                sb.append(line);
+                sb.append("\n");
+                line = rd.readLine();
+            }
+        } finally {
+            rd.close();
         }
-        rd.close();
         return sb.toString();
     }
 

Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java Thu Nov 27 12:49:54 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.ExecType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.UDFContext;
 
 import com.google.common.base.Charsets;
@@ -100,7 +101,7 @@ public class ScriptingOutputCapturer {
         log.debug("TaskId: " + taskId);
         log.debug("hadoopLogDir: " + hadoopLogDir);
 
-        if (execType.isLocal()) {
+        if (execType.isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_FETCH, false)) {
             String logDir = System.getProperty("pig.udf.scripting.log.dir");
             if (logDir == null)
                 logDir = ".";

Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java Thu Nov 27 12:49:54 2014
@@ -300,7 +300,7 @@ public class JythonScriptEngine extends 
                                 // determine the relative path that the file should have in the jar
                                 int pos = apath.lastIndexOf(File.separatorChar + name.replace('.', File.separatorChar));
                                 if (pos > 0) {
-                                    files.put(apath.substring(pos), apath);
+                                    files.put(apath.substring(pos + 1), apath);
                                 } else {
                                     files.put(apath, apath);
                                 }
@@ -409,13 +409,15 @@ public class JythonScriptEngine extends 
             throw new IOException("Can't read file: " + scriptFile);
         }
 
-        // TODO: fis1 is not closed
         FileInputStream fis1 = new FileInputStream(scriptFile);
-        if (hasFunction(fis1)) {
-            registerFunctions(scriptFile, null, pigContext);
+        try {
+            if (hasFunction(fis1)) {
+                registerFunctions(scriptFile, null, pigContext);
+            }
+        } finally {
+            fis1.close();
         }
 
-        
         Interpreter.setMain(true);
         FileInputStream fis = new FileInputStream(scriptFile);
         try {

Modified: pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java Thu Nov 27 12:49:54 2014
@@ -56,7 +56,12 @@ public class PythonScriptEngine extends 
         }
         
         FileInputStream fin = new FileInputStream(f);
-        List<String[]> functions = getFunctions(fin);
+        List<String[]> functions = null;
+        try {
+            functions = getFunctions(fin);
+        } finally {
+            fin.close();
+        }
         namespace = namespace == null ? "" : namespace + NAMESPACE_SEPARATOR;
         for(String[] functionInfo : functions) {
             String name = functionInfo[0];
@@ -75,7 +80,6 @@ public class PythonScriptEngine extends 
                                                     execType, isIllustrate
                                         }));
         }
-        fin.close();
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Thu Nov 27 12:49:54 2014
@@ -45,6 +45,7 @@ import java.util.Set;
 import jline.ConsoleReader;
 import jline.ConsoleReaderInputStream;
 
+import org.apache.commons.io.output.NullOutputStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
@@ -354,7 +355,7 @@ public class GruntParser extends PigScri
                                   List<String> params, List<String> files,
                                   boolean dontPrintOutput)
         throws IOException, ParseException {
-
+        filter.validate(PigCommandFilter.Command.EXPLAIN);
         if (null != mExplain) {
             return;
         }
@@ -390,26 +391,9 @@ public class GruntParser extends PigScri
         explainCurrentBatch(false);
     }
 
-    /**
-     * A {@link PrintStream} implementation which does not write anything
-     * Used with '-check' command line option to pig Main
-     * (through {@link GruntParser#explainCurrentBatch(boolean) } )
-     */
-    static class NullPrintStream extends PrintStream {
-        public NullPrintStream(String fileName) throws FileNotFoundException {
-            super(fileName);
-        }
-        @Override
-        public void write(byte[] buf, int off, int len) {}
-        @Override
-        public void write(int b) {}
-        @Override
-        public void write(byte [] b) {}
-    }
-
     protected void explainCurrentBatch(boolean dontPrintOutput) throws IOException {
-        PrintStream lp = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
-        PrintStream ep = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
+        PrintStream lp = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out;
+        PrintStream ep = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out;
 
         if (!(mExplain.mLast && mExplain.mCount == 0)) {
             if (mPigServer.isBatchEmpty()) {
@@ -489,13 +473,20 @@ public class GruntParser extends PigScri
 
         PigContext context = mPigServer.getPigContext();
         BufferedReader reader = new BufferedReader(new FileReader(scriptPath));
-        return context.doParamSubstitution(reader, params, paramFiles);
+        String result = context.doParamSubstitution(reader, params, paramFiles);
+        reader.close();
+        return result;
     }
 
     @Override
     protected void processScript(String script, boolean batch,
                                  List<String> params, List<String> files)
         throws IOException, ParseException {
+        if(batch) {
+            filter.validate(PigCommandFilter.Command.EXEC);
+        } else {
+            filter.validate(PigCommandFilter.Command.RUN);
+        }
 
         if(mExplain == null) { // process only if not in "explain" mode
             if (script == null) {
@@ -1203,10 +1194,16 @@ public class GruntParser extends PigScri
     }
 
     public static int runSQLCommand(String hcatBin, String cmd, boolean mInteractive) throws IOException {
-        String[] tokens = new String[3];
-        tokens[0] = hcatBin;
-        tokens[1] = "-e";
-        tokens[2] = cmd.substring(cmd.indexOf("sql")).substring(4);
+        List<String> tokensList = new ArrayList<String>();
+		if (hcatBin.endsWith(".py")) {
+			tokensList.add("python");
+			tokensList.add(hcatBin);
+		} else {
+			tokensList.add(hcatBin);
+		}
+		tokensList.add("-e");
+		tokensList.add(cmd.substring(cmd.indexOf("sql")).substring(4).replaceAll("\n", " "));
+		String[] tokens = tokensList.toArray(new String[]{});
 
         // create new environment = environment - HADOOP_CLASSPATH
         // This is because of antlr version conflict between Pig and Hive
@@ -1218,7 +1215,7 @@ public class GruntParser extends PigScri
             }
         }
 
-        log.info("Going to run hcat command: " + tokens[2]);
+        log.info("Going to run hcat command: " + tokens[tokens.length-1]);
         Process executor = Runtime.getRuntime().exec(tokens, envSet.toArray(new String[0]));
         StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
         StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);

Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Thu Nov 27 12:49:54 2014
@@ -287,7 +287,7 @@ void Parse() throws IOException : {}
 void input() throws IOException  :
 {
     String s;
-    Token strTok;
+    Token strTok = null;
 }
 {
     strTok = <PIG>
@@ -296,14 +296,16 @@ void input() throws IOException  :
         out.append(strTok.image );
     }
     |
-    <DECLARE> 
+    strTok = <DECLARE> 
     (
         param_value(true) // overwrite=true
+        { pc.validate(strTok.toString()); }
     )
     |
-    <PIGDEFAULT>
+    strTok = <PIGDEFAULT>
     (
         param_value(false) // overwrite=false
+        { pc.validate(strTok.toString()); }
     )
     |
     s = paramString(){}

Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Thu Nov 27 12:49:54 2014
@@ -36,6 +36,11 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.Shell;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.validator.BlackAndWhitelistFilter;
+import org.apache.pig.validator.PigCommandFilter;
+import org.python.google.common.base.Preconditions;
 
 public class PreprocessorContext {
 
@@ -44,6 +49,8 @@ public class PreprocessorContext {
     // used internally to detect when a param is set multiple times,
     // but it set with the same value so it's ok not to log a warning
     private Map<String, String> param_source;
+    
+    private PigContext pigContext;
 
     public Map<String, String> getParamVal() {
         return param_val;
@@ -65,6 +72,10 @@ public class PreprocessorContext {
         param_source = new Hashtable<String, String>(paramVal);
     }
 
+    public void setPigContext(PigContext context) {
+        this.pigContext = context;
+    }
+
     /*
     public  void processLiteral(String key, String val) {
         processLiteral(key, val, true);
@@ -76,7 +87,7 @@ public class PreprocessorContext {
      * @param key - parameter name
      * @param val - string containing command to be executed
      */
-    public  void processShellCmd(String key, String val)  throws ParameterSubstitutionException {
+    public  void processShellCmd(String key, String val)  throws ParameterSubstitutionException, FrontendException {
         processShellCmd(key, val, true);
     }
 
@@ -112,13 +123,18 @@ public class PreprocessorContext {
      * @param key - parameter name
      * @param val - string containing command to be executed
      */
-    public  void processShellCmd(String key, String val, Boolean overwrite)  throws ParameterSubstitutionException {
+    public  void processShellCmd(String key, String val, Boolean overwrite)  throws ParameterSubstitutionException, FrontendException {
+        if (pigContext != null) {
+            BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext);
+            filter.validate(PigCommandFilter.Command.SH);
+        }
 
         if (param_val.containsKey(key)) {
             if (param_source.get(key).equals(val) || !overwrite) {
                 return;
             } else {
-                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
+                log.warn("Warning : Multiple values found for " + key
+                        + ". Using value " + val);
             }
         }
 
@@ -130,6 +146,25 @@ public class PreprocessorContext {
         param_val.put(key, sub_val);
     }
 
+    public void validate(String preprocessorCmd) throws FrontendException {
+        if (pigContext == null) {
+            return;
+        }
+
+        final BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext);
+        final String declareToken = "%declare";
+        final String defaultToken = "%default";
+
+        if (preprocessorCmd.toLowerCase().equals(declareToken)) {
+            filter.validate(PigCommandFilter.Command.DECLARE);
+        } else if (preprocessorCmd.toLowerCase().equals(defaultToken)) {
+            filter.validate(PigCommandFilter.Command.DEFAULT);
+        } else {
+            throw new IllegalArgumentException("Pig Internal Error. Invalid preprocessor command specified : "
+                            + preprocessorCmd);
+        }
+    }
+    
     /**
      * This method generates value for the specified key by
      * performing substitution if needed within the value first.

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java Thu Nov 27 12:49:54 2014
@@ -21,7 +21,6 @@ package org.apache.pig.tools.pigstats;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -182,17 +181,17 @@ public abstract class JobStats extends O
      * @param durations
      * @return median value
      */
-    protected long calculateMedianValue(long[] durations) {
+    protected long calculateMedianValue(List<Long> durations) {
         long median;
         // figure out the median
-        Arrays.sort(durations);
-        int midPoint = durations.length /2;
-        if ((durations.length & 1) == 1) {
+        Collections.sort(durations);
+        int midPoint = durations.size() /2;
+        if ((durations.size() & 1) == 1) {
             // odd
-            median = durations[midPoint];
+            median = durations.get(midPoint);
         } else {
             // even
-            median = (durations[midPoint-1] + durations[midPoint]) / 2;
+            median = (durations.get(midPoint-1) + durations.get(midPoint)) / 2;
         }
         return median;
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Thu Nov 27 12:49:54 2014
@@ -18,12 +18,11 @@
 
 package org.apache.pig.tools.pigstats;
 
+import org.apache.pig.PigRunner;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.plan.OperatorPlan;
 
-import org.apache.pig.PigRunner;
-
 /**
  * Should be implemented by an object that wants to receive notifications
  * from {@link PigRunner}.
@@ -33,7 +32,7 @@ import org.apache.pig.PigRunner;
 public interface PigProgressNotificationListener extends java.util.EventListener {
 
     /**
-     * Invoked before any Hadoop jobs are run with the plan that is to be executed.
+     * Invoked before any Hadoop jobs (or a Tez DAG) are run with the plan that is to be executed.
      *
      * @param scriptId the unique id of the script
      * @param plan the OperatorPlan that is to be executed
@@ -41,36 +40,36 @@ public interface PigProgressNotification
     public void initialPlanNotification(String scriptId, OperatorPlan<?> plan);
 
     /**
-     * Invoked just before launching Hadoop jobs spawned by the script.
+     * Invoked just before launching Hadoop jobs (or tez DAGs) spawned by the script.
      * @param scriptId the unique id of the script
-     * @param numJobsToLaunch the total number of Hadoop jobs spawned by the script
+     * @param numJobsToLaunch the total number of Hadoop jobs (or Tez DAGs) spawned by the script
      */
     public void launchStartedNotification(String scriptId, int numJobsToLaunch);
 
     /**
-     * Invoked just before submitting a batch of Hadoop jobs.
+     * Invoked just before submitting a batch of Hadoop jobs (or Tez DAGs).
      * @param scriptId the unique id of the script
-     * @param numJobsSubmitted the number of Hadoop jobs in the batch
+     * @param numJobsSubmitted the number of Hadoop jobs (or Tez DAGs) in the batch
      */
     public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted);
 
     /**
-     * Invoked after a Hadoop job is started.
-     * @param scriptId the unique id of the script 
-     * @param assignedJobId the Hadoop job id
+     * Invoked after a Hadoop job (or Tez DAG) is started.
+     * @param scriptId the unique id of the script
+     * @param assignedJobId the Hadoop job id (or Tez DAG job id)
      */
     public void jobStartedNotification(String scriptId, String assignedJobId);
 
     /**
-     * Invoked just after a Hadoop job is completed successfully. 
-     * @param scriptId the unique id of the script 
-     * @param jobStats the {@link JobStats} object associated with the Hadoop job
+     * Invoked just after a Hadoop job (or Tez DAG) is completed successfully.
+     * @param scriptId the unique id of the script
+     * @param jobStats the {@link JobStats} object associated with the Hadoop job (or Tez DAG)
      */
     public void jobFinishedNotification(String scriptId, JobStats jobStats);
 
     /**
      * Invoked when a Hadoop job fails.
-     * @param scriptId the unique id of the script 
+     * @param scriptId the unique id of the script
      * @param jobStats the {@link JobStats} object associated with the Hadoop job
      */
     public void jobFailedNotification(String scriptId, JobStats jobStats);
@@ -83,16 +82,16 @@ public interface PigProgressNotification
     public void outputCompletedNotification(String scriptId, OutputStats outputStats);
 
     /**
-     * Invoked to update the execution progress. 
+     * Invoked to update the execution progress.
      * @param scriptId the unique id of the script
      * @param progress the percentage of the execution progress
      */
     public void progressUpdatedNotification(String scriptId, int progress);
 
     /**
-     * Invoked just after all Hadoop jobs spawned by the script are completed.
+     * Invoked just after all Hadoop jobs (Tez DAGs) spawned by the script are completed.
      * @param scriptId the unique id of the script
-     * @param numJobsSucceeded the total number of Hadoop jobs succeeded
+     * @param numJobsSucceeded the total number of Hadoop jobs (Tez DAGs) succeeded
      */
     public void launchCompletedNotification(String scriptId, int numJobsSucceeded);
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Thu Nov 27 12:49:54 2014
@@ -21,6 +21,8 @@ package org.apache.pig.tools.pigstats;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.util.Progressable;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.hadoop.executionengine.TaskContext;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -33,6 +35,15 @@ public class PigStatusReporter extends S
 
     private TaskContext<?> context = null;
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PigStatusReporter.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        reporter = null;
+    }
+
     private PigStatusReporter() {
     }
 
@@ -46,10 +57,6 @@ public class PigStatusReporter extends S
         return reporter;
     }
 
-    public void destroy() {
-        context = null;
-    }
-
     public void setContext(TaskContext<?> context) {
         this.context = context;
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Nov 27 12:49:54 2014
@@ -296,7 +296,7 @@ public abstract class ScriptState {
         // restrict the size of the script to be stored in job conf
         int maxScriptSize = 10240;
         if (pigContext != null) {
-            String prop = pigContext.getProperties().getProperty(PigConfiguration.MAX_SCRIPT_SIZE);
+            String prop = pigContext.getProperties().getProperty(PigConfiguration.PIG_SCRIPT_MAX_SIZE);
             if (prop != null) {
                 maxScriptSize = Integer.valueOf(prop);
             }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Thu Nov 27 12:49:54 2014
@@ -31,9 +31,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -70,7 +68,7 @@ public final class MRJobStats extends Jo
     }
 
     public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" +
-            "MaxMapTime\tMinMapTIme\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
+            "MaxMapTime\tMinMapTime\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
             "MinReduceTime\tAvgReduceTime\tMedianReducetime\tAlias\tFeature\tOutputs";
 
     public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs";
@@ -349,13 +347,13 @@ public final class MRJobStats extends Jo
     }
 
     void addMapReduceStatistics(Job job) {
-        TaskReport[] maps = null;
+        Iterator<TaskReport> maps = null;
         try {
             maps = HadoopShims.getTaskReports(job, TaskType.MAP);
         } catch (IOException e) {
             LOG.warn("Failed to get map task report", e);
         }
-        TaskReport[] reduces = null;
+        Iterator<TaskReport> reduces = null;
         try {
             reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
         } catch (IOException e) {
@@ -364,21 +362,22 @@ public final class MRJobStats extends Jo
         addMapReduceStatistics(maps, reduces);
     }
 
-    private TaskStat getTaskStat(TaskReport[] tasks) {
-        int size = tasks.length;
+    private TaskStat getTaskStat(Iterator<TaskReport> tasks) {
+        int size = 0;
         long max = 0;
         long min = Long.MAX_VALUE;
         long median = 0;
         long total = 0;
-        long durations[] = new long[size];
+        List<Long> durations = new ArrayList<Long>();
 
-        for (int i = 0; i < tasks.length; i++) {
-            TaskReport rpt = tasks[i];
+        while(tasks.hasNext()){
+            TaskReport rpt = tasks.next();
             long duration = rpt.getFinishTime() - rpt.getStartTime();
-            durations[i] = duration;
+            durations.add(duration);
             max = (duration > max) ? duration : max;
             min = (duration < min) ? duration : min;
             total += duration;
+            size++;
         }
         long avg = total / size;
 
@@ -387,8 +386,8 @@ public final class MRJobStats extends Jo
         return new TaskStat(size, max, min, avg, median);
     }
 
-    private void addMapReduceStatistics(TaskReport[] maps, TaskReport[] reduces) {
-        if (maps != null && maps.length > 0) {
+    private void addMapReduceStatistics(Iterator<TaskReport> maps, Iterator<TaskReport> reduces) {
+        if (maps != null && maps.hasNext()) {
             TaskStat st = getTaskStat(maps);
             setMapStat(st.size, st.max, st.min, st.avg, st.median);
         } else {
@@ -398,7 +397,7 @@ public final class MRJobStats extends Jo
             }
         }
 
-        if (reduces != null && reduces.length > 0) {
+        if (reduces != null && reduces.hasNext()) {
             TaskStat st = getTaskStat(reduces);
             setReduceStat(st.size, st.max, st.min, st.avg, st.median);
         } else {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Thu Nov 27 12:49:54 2014
@@ -20,9 +20,10 @@ package org.apache.pig.tools.pigstats.te
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,15 +32,21 @@ import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.ScriptState;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 /**
@@ -52,9 +59,8 @@ import com.google.common.collect.Maps;
 public class TezScriptState extends ScriptState {
     private static final Log LOG = LogFactory.getLog(TezScriptState.class);
 
-    private Map<TezOperator, String> featureMap = null;
-    private Map<TezOperator, String> aliasMap = Maps.newHashMap();
-    private Map<TezOperator, String> aliasLocationMap = Maps.newHashMap();
+    private List<PigTezProgressNotificationListener> tezListeners = Lists.newArrayList();
+    private Map<String, TezDAGScriptInfo> dagScriptInfo = Maps.newHashMap();
 
     public TezScriptState(String id) {
         super(id);
@@ -64,13 +70,48 @@ public class TezScriptState extends Scri
         return (TezScriptState) ScriptState.get();
     }
 
-    public void addSettingsToConf(TezOperator tezOp, Configuration conf) {
+    @Override
+    public void registerListener(PigProgressNotificationListener listener) {
+        super.registerListener(listener);
+        if (listener instanceof PigTezProgressNotificationListener) {
+            tezListeners.add((PigTezProgressNotificationListener) listener);
+        }
+    }
+
+    public void dagLaunchNotification(String dagName, OperatorPlan<?> dagPlan, int numVerticesToLaunch)  {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagLaunchNotification(id, dagName, dagPlan, numVerticesToLaunch);
+        }
+    }
+
+    public void dagStartedNotification(String dagName, String assignedApplicationId)  {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagStartedNotification(id, dagName, assignedApplicationId);
+        }
+    }
+
+    public void dagProgressNotification(String dagName, int numVerticesCompleted, int progress) {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagProgressNotification(id, dagName, numVerticesCompleted, progress);
+        }
+    }
+
+    public void dagCompletedNotification(String dagName, TezDAGStats tezDAGStats) {
+        for (PigTezProgressNotificationListener listener: tezListeners) {
+            listener.dagCompletedNotification(id, dagName, tezDAGStats.isSuccessful(), tezDAGStats);
+        }
+    }
+
+    public void addDAGSettingsToConf(Configuration conf) {
         LOG.info("Pig script settings are added to the job");
         conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
         conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
         conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
         conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
         conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
+    }
+
+    public void addVertexSettingsToConf(String dagName, TezOperator tezOp, Configuration conf) {
 
         try {
             List<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
@@ -95,8 +136,8 @@ public class TezScriptState extends Scri
             LOG.warn("unable to get the map loads", e);
         }
 
-        setPigFeature(tezOp, conf);
-        setJobParents(tezOp, conf);
+        setPigFeature(dagName, tezOp, conf);
+        setJobParents(dagName, tezOp, conf);
 
         conf.set("mapreduce.workflow.id", "pig_" + id);
         conf.set("mapreduce.workflow.name", getFileName().isEmpty() ? "default" : getFileName());
@@ -116,32 +157,30 @@ public class TezScriptState extends Scri
         }
     }
 
-    private void setPigFeature(TezOperator tezOp, Configuration conf) {
-        conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(tezOp));
+    private void setPigFeature(String dagName, TezOperator tezOp, Configuration conf) {
+        if (tezOp.isVertexGroup()) {
+            return;
+        }
+        TezDAGScriptInfo dagInfo = getDAGScriptInfo(dagName);
+        conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), dagInfo.getPigFeatures(tezOp));
         if (scriptFeatures != 0) {
             conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(),
                     String.valueOf(scriptFeatures));
         }
-        conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(tezOp));
-        conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), getAliasLocation(tezOp));
+        conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), dagInfo.getAlias(tezOp));
+        conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), dagInfo.getAliasLocation(tezOp));
     }
 
-    private void setJobParents(TezOperator tezOp, Configuration conf) {
+    private void setJobParents(String dagName, TezOperator tezOp, Configuration conf) {
+        if (tezOp.isVertexGroup()) {
+            return;
+        }
         // PigStats maintains a job DAG with the job id being updated
         // upon available. Therefore, before a job is submitted, the ids
         // of its parent jobs are already available.
-        JobGraph jg = PigStats.get().getJobGraph();
-        JobStats js = null;
-        Iterator<JobStats> iter = jg.iterator();
-        while (iter.hasNext()) {
-            JobStats job = iter.next();
-            if (job.getName().equals(tezOp.getOperatorKey().toString())) {
-                js = job;
-                break;
-            }
-        }
+        JobStats js = ((TezPigScriptStats)PigStats.get()).getVertexStats(dagName, tezOp.getOperatorKey().toString());
         if (js != null) {
-            List<Operator> preds = jg.getPredecessors(js);
+            List<Operator> preds = js.getPlan().getPredecessors(js);
             if (preds != null) {
                 StringBuilder sb = new StringBuilder();
                 for (Operator op : preds) {
@@ -154,87 +193,173 @@ public class TezScriptState extends Scri
         }
     }
 
-    public String getAlias(TezOperator tezOp) {
-        if (!aliasMap.containsKey(tezOp)) {
-            setAlias(tezOp);
-        }
-        return aliasMap.get(tezOp);
+    public TezDAGScriptInfo setDAGScriptInfo(TezPlanContainerNode tezPlanNode) {
+        TezDAGScriptInfo info = new TezDAGScriptInfo(tezPlanNode.getTezOperPlan());
+        dagScriptInfo.put(tezPlanNode.getOperatorKey().toString(), info);
+        return info;
     }
 
-    private void setAlias(TezOperator tezOp) {
-        ArrayList<String> alias = new ArrayList<String>();
-        String aliasLocationStr = "";
-        try {
-            ArrayList<String> aliasLocation = new ArrayList<String>();
-            new AliasVisitor(tezOp.plan, alias, aliasLocation).visit();
-            aliasLocationStr += LoadFunc.join(aliasLocation, ",");
-            if (!alias.isEmpty()) {
-                Collections.sort(alias);
-            }
-        } catch (VisitorException e) {
-            LOG.warn("unable to get alias", e);
-        }
-        aliasMap.put(tezOp, LoadFunc.join(alias, ","));
-        aliasLocationMap.put(tezOp, aliasLocationStr);
+    public TezDAGScriptInfo getDAGScriptInfo(String dagName) {
+        return dagScriptInfo.get(dagName);
     }
 
-    public String getAliasLocation(TezOperator tezOp) {
-        if (!aliasLocationMap.containsKey(tezOp)) {
-            setAlias(tezOp);
-        }
-        return aliasLocationMap.get(tezOp);
-    }
+    static class TezDAGScriptInfo {
 
-    public String getPigFeature(TezOperator tezOp) {
-        if (featureMap == null) {
-            featureMap = Maps.newHashMap();
-        }
+        private static final Log LOG = LogFactory.getLog(TezDAGScriptInfo.class);
+        private TezOperPlan tezPlan;
+        private String alias;
+        private String aliasLocation;
+        private String features;
+
+        private Map<OperatorKey, String> featuresMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasMap = Maps.newHashMap();
+        private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap();
+
+        class DAGAliasVisitor extends TezOpPlanVisitor {
+
+            private Set<String> aliases;
+            private Set<String> aliasLocations;
+            private BitSet featureSet;
+
+            public DAGAliasVisitor(TezOperPlan plan) {
+                super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+                this.aliases = new HashSet<String>();
+                this.aliasLocations = new HashSet<String>();
+                this.featureSet = new BitSet();
+            }
+
+            @Override
+            public void visitTezOp(TezOperator tezOp) throws VisitorException {
+                if (tezOp.isVertexGroup()) {
+                    featureSet.set(PIG_FEATURE.UNION.ordinal());
+                    return;
+                }
+                ArrayList<String> aliasList = new ArrayList<String>();
+                String aliasLocationStr = "";
+                try {
+                    ArrayList<String> aliasLocationList = new ArrayList<String>();
+                    new AliasVisitor(tezOp.plan, aliasList, aliasLocationList).visit();
+                    aliasLocationStr += LoadFunc.join(aliasLocationList, ",");
+                    if (!aliasList.isEmpty()) {
+                        Collections.sort(aliasList);
+                        aliases.addAll(aliasList);
+                        aliasLocations.addAll(aliasLocationList);
+                    }
+                } catch (VisitorException e) {
+                    LOG.warn("unable to get alias", e);
+                }
+                aliasMap.put(tezOp.getOperatorKey(), LoadFunc.join(aliasList, ","));
+                aliasLocationMap.put(tezOp.getOperatorKey(), aliasLocationStr);
 
-        String retStr = featureMap.get(tezOp);
-        if (retStr == null) {
-            BitSet feature = new BitSet();
-            feature.clear();
-            if (tezOp.isSkewedJoin()) {
-                feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
-            }
-            if (tezOp.isGlobalSort()) {
-                feature.set(PIG_FEATURE.ORDER_BY.ordinal());
-            }
-            if (tezOp.isSampler()) {
-                feature.set(PIG_FEATURE.SAMPLER.ordinal());
-            }
-            if (tezOp.isIndexer()) {
-                feature.set(PIG_FEATURE.INDEXER.ordinal());
-            }
-            if (tezOp.isCogroup()) {
-                feature.set(PIG_FEATURE.COGROUP.ordinal());
-            }
-            if (tezOp.isGroupBy()) {
-                feature.set(PIG_FEATURE.GROUP_BY.ordinal());
-            }
-            if (tezOp.isRegularJoin()) {
-                feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
-            }
-            if (tezOp.isUnion()) {
-                feature.set(PIG_FEATURE.UNION.ordinal());
+
+                BitSet feature = new BitSet();
+                feature.clear();
+                if (tezOp.isSkewedJoin()) {
+                    feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
+                }
+                if (tezOp.isGlobalSort()) {
+                    feature.set(PIG_FEATURE.ORDER_BY.ordinal());
+                }
+                if (tezOp.isSampler()) {
+                    feature.set(PIG_FEATURE.SAMPLER.ordinal());
+                }
+                if (tezOp.isIndexer()) {
+                    feature.set(PIG_FEATURE.INDEXER.ordinal());
+                }
+                if (tezOp.isCogroup()) {
+                    feature.set(PIG_FEATURE.COGROUP.ordinal());
+                }
+                if (tezOp.isGroupBy()) {
+                    feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+                }
+                if (tezOp.isRegularJoin()) {
+                    feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+                }
+                if (tezOp.isUnion()) {
+                    feature.set(PIG_FEATURE.UNION.ordinal());
+                }
+                if (tezOp.isNative()) {
+                    feature.set(PIG_FEATURE.NATIVE.ordinal());
+                }
+                if (tezOp.isLimit() || tezOp.isLimitAfterSort()) {
+                    feature.set(PIG_FEATURE.LIMIT.ordinal());
+                }
+                try {
+                    new FeatureVisitor(tezOp.plan, feature).visit();
+                } catch (VisitorException e) {
+                    LOG.warn("Feature visitor failed", e);
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                featuresMap.put(tezOp.getOperatorKey(), sb.toString());
+                for (int i=0; i < feature.length(); i++) {
+                    if (feature.get(i)) {
+                        featureSet.set(i);
+                    }
+                }
             }
-            if (tezOp.isNative()) {
-                feature.set(PIG_FEATURE.NATIVE.ordinal());
+
+            @Override
+            public void visit() throws VisitorException {
+                super.visit();
+                if (!aliases.isEmpty()) {
+                    ArrayList<String> aliasList = new ArrayList<String>(aliases);
+                    ArrayList<String> aliasLocationList = new ArrayList<String>(aliasLocations);
+                    Collections.sort(aliasList);
+                    Collections.sort(aliasLocationList);
+                    alias = LoadFunc.join(aliasList, ",");
+                    aliasLocation = LoadFunc.join(aliasLocationList, ",");
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = featureSet.nextSetBit(0); i >= 0; i = featureSet.nextSetBit(i+1)) {
+                    if (sb.length() > 0) sb.append(",");
+                    sb.append(PIG_FEATURE.values()[i].name());
+                }
+                features = sb.toString();
             }
+
+        }
+
+        public TezDAGScriptInfo(TezOperPlan tezPlan) {
+            this.tezPlan = tezPlan;
+            initialize();
+        }
+
+        private void initialize() {
             try {
-                new FeatureVisitor(tezOp.plan, feature).visit();
+                new DAGAliasVisitor(tezPlan).visit();
             } catch (VisitorException e) {
-                LOG.warn("Feature visitor failed", e);
+                LOG.warn("Cannot calculate alias information for DAG", e);
             }
-            StringBuilder sb = new StringBuilder();
-            for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
-                if (sb.length() > 0) sb.append(",");
-                sb.append(PIG_FEATURE.values()[i].name());
-            }
-            retStr = sb.toString();
-            featureMap.put(tezOp, retStr);
         }
-        return retStr;
+
+        public String getAlias() {
+            return alias;
+        }
+
+        public String getAliasLocation() {
+            return aliasLocation;
+        }
+
+        public String getPigFeatures() {
+            return features;
+        }
+
+        public String getAlias(TezOperator tezOp) {
+            return aliasMap.get(tezOp.getOperatorKey());
+        }
+
+        public String getAliasLocation(TezOperator tezOp) {
+            return aliasLocationMap.get(tezOp.getOperatorKey());
+        }
+
+        public String getPigFeatures(TezOperator tezOp) {
+            return featuresMap.get(tezOp.getOperatorKey());
+        }
+
     }
 
 }

Modified: pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java (original)
+++ pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java Thu Nov 27 12:49:54 2014
@@ -35,12 +35,17 @@ public final class BlackAndWhitelistFilt
     private static final Splitter SPLITTER = Splitter.on(',').trimResults()
             .omitEmptyStrings();
 
-    private final PigServer pigServer;
+    private final PigContext context;
     private final Set<String> whitelist;
     private final Set<String> blacklist;
 
     public BlackAndWhitelistFilter(PigServer pigServer) {
-        this.pigServer = pigServer;
+        this(pigServer.getPigContext());
+    }
+
+    public BlackAndWhitelistFilter(PigContext context) {
+        this.context = context;
+
         whitelist = Sets.newHashSet();
         blacklist = Sets.newHashSet();
 
@@ -48,7 +53,6 @@ public final class BlackAndWhitelistFilt
     }
 
     private void init() {
-        PigContext context = pigServer.getPigContext();
         String whitelistConfig = context.getProperties().getProperty(PigConfiguration.PIG_WHITELIST);
 
         if (whitelistConfig != null) {

Modified: pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java (original)
+++ pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java Thu Nov 27 12:49:54 2014
@@ -34,7 +34,7 @@ import org.apache.pig.impl.logicalLayer.
 public interface PigCommandFilter {
 
     public enum Command {
-        FS, LS, SH, MAPREDUCE, REGISTER, SET, CAT, CD, DUMP, KILL, PWD, MV, CP, COPYTOLOCAL, COPYFROMLOCAL, MKDIR, RM, RMF, ILLUSTRATE
+        DEFINE, DECLARE, DEFAULT, EXPLAIN, EXEC, FS, IMPORT, LS, SH, MAPREDUCE, REGISTER, SET, CAT, CD, DUMP, KILL, PWD, MV, CP, COPYTOLOCAL, COPYFROMLOCAL, MKDIR, RM, RMF, RUN, ILLUSTRATE
     }
 
     /**

Modified: pig/branches/spark/src/python/streaming/controller.py
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/python/streaming/controller.py?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/python/streaming/controller.py (original)
+++ pig/branches/spark/src/python/streaming/controller.py Thu Nov 27 12:49:54 2014
@@ -96,7 +96,7 @@ class PythonStreamingController:
 
         if is_illustrate or udf_logging.udf_log_level != logging.DEBUG:
             #Only log output for illustrate after we get the flag to capture output.
-            sys.stdout = open("/dev/null", 'w')
+            sys.stdout = open(os.devnull, 'w')
         else:
             sys.stdout = self.output_stream
 
@@ -157,7 +157,14 @@ class PythonStreamingController:
         input_str = input_stream.readline()
 
         while input_str.endswith(END_RECORD_DELIM) == False:
-            input_str += input_stream.readline()
+            line = input_stream.readline()
+            if line == '':
+                input_str = ''
+                break
+            input_str += line
+
+        if input_str == '':
+            return END_OF_STREAM
 
         if input_str == TURN_ON_OUTPUT_CAPTURING:
             logging.debug("Turned on Output Capturing")
@@ -297,6 +304,12 @@ def _deserialize_collection(input_str, r
     else:
         return list_result
 
+def wrap_tuple(o, serialized_item):
+    if type(o) != tuple:
+        return WRAPPED_TUPLE_START + serialized_item + WRAPPED_TUPLE_END
+    else:
+        return serialized_item
+
 def serialize_output(output, utfEncodeAllFields=False):
     """
     @param utfEncodeStrings - Generally we want to utf encode only strings.  But for
@@ -314,7 +327,7 @@ def serialize_output(output, utfEncodeAl
                 WRAPPED_TUPLE_END)
     elif output_type == list:
         return (WRAPPED_BAG_START +
-                WRAPPED_FIELD_DELIMITER.join([serialize_output(o, utfEncodeAllFields) for o in output]) +
+                WRAPPED_FIELD_DELIMITER.join([wrap_tuple(o, serialize_output(o, utfEncodeAllFields)) for o in output]) +
                 WRAPPED_BAG_END)
     elif output_type == dict:
         return (WRAPPED_MAP_START +

Modified: pig/branches/spark/test/e2e/harness/TestDriver.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/harness/TestDriver.pm?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/harness/TestDriver.pm (original)
+++ pig/branches/spark/test/e2e/harness/TestDriver.pm Thu Nov 27 12:49:54 2014
@@ -717,7 +717,7 @@ sub runTestGroup() {
 					$testStatuses->{$testName} = $failedStr;
 
 				}
-                                $msg .= "\nEnding test $testName at " . time ."\n";
+                $msg .= " at " . time . "\nEnding test $testName at " . time ."\n";
 				print $subLog $msg;
 				$duration = $endTime - $beginTime;
 				$dbinfo{'duration'} = $duration;

Modified: pig/branches/spark/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/build.xml (original)
+++ pig/branches/spark/test/e2e/pig/build.xml Thu Nov 27 12:49:54 2014
@@ -18,15 +18,14 @@
 <project name="TestHarnessPigTests" default="test">
 
   <property name="pig.dir" value="${basedir}/../../.."/>
-  <property name="ivy.dir" location="${pig.dir}/ivy" />
+  <property name="pig.base.dir" value="${basedir}/../../.."/>
+  <property name="ivy.dir" location="${pig.base.dir}/ivy" />
   <loadproperties srcfile="${ivy.dir}/libraries.properties"/>
 
-  <property name="jython.jar"
-    value="${pig.dir}/build/ivy/lib/Pig/jython-standalone-${jython.version}.jar"/>
-  <property name="jruby.jar"
-    value="${pig.dir}/build/ivy/lib/Pig/jruby-complete-${jruby.version}.jar"/>
+  <property name="piggybank.jar"
+	value="${pig.base.dir}/contrib/piggybank/java/piggybank.jar"/>
   <property name="hive.lib.dir"
-	value="${pig.dir}/build/ivy/lib/Pig"/>
+	value="${pig.base.dir}/build/ivy/lib/Pig"/>
 
   <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S">
     <equals arg1="${hadoopversion}" arg2="23" />
@@ -216,23 +215,14 @@
 
   <!-- Check that the necessary properties are setup -->
   <target name="property-check">
-    <fail message="Please set the property harness.cluster.conf to the conf directory of your hadoop installation or harness.hadoop.home to HADOOP_HOME for your Hadoop installation.">
+    <fail message="Please set the property harness.cluster.conf to the conf directory of your hadoop installation,harness.hadoop.home to HADOOP_HOME for your Hadoop installation and harness.cluster.bin to the binary executable of your hadoop installation.">
         <condition>
             <not>
-                <or>
+                <and>
                 <isset property="harness.cluster.conf"/>
                 <isset property="harness.hadoop.home"/>
-                </or>
-            </not>
-        </condition>
-    </fail>
-    <fail message="Please set the property harness.cluster.bin to the binary executable of your hadoop installation or harness.hadoop.home to HADOOP_HOME for your Hadoop installation.">
-        <condition>
-            <not>
-                <or>
                 <isset property="harness.cluster.bin"/>
-                <isset property="harness.hadoop.home"/>
-                </or>
+                </and>
             </not>
         </condition>
     </fail>
@@ -279,6 +269,7 @@
       <env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/>
       <env key="PIG_USE_PYTHON" value="${harness.use.python}"/>
       <env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/>
+      <env key="PH_PIGGYBANK_JAR" value="${piggybank.jar}"/>
       <env key="PH_HIVE_LIB_DIR" value="${hive.lib.dir}"/>
       <env key="PH_HIVE_VERSION" value="${hive.version}"/>
       <env key="PH_HIVE_SHIMS_VERSION" value="${hive.hadoop.shims.version}"/>

Modified: pig/branches/spark/test/e2e/pig/conf/default.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/default.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/default.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/default.conf Thu Nov 27 12:49:54 2014
@@ -49,7 +49,7 @@ $cfg = {
     , 'testconfigpath'   => "$ENV{HADOOP_CONF_DIR}"
     , 'funcjarPath'      => "$ENV{PH_ROOT}/lib/java"
     , 'paramPath'        => "$ENV{PH_ROOT}/paramfiles"
-    , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java"
+    , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}"
     , 'pigpath'          => "$ENV{PH_PIG}"
     , 'oldpigpath'       => "$ENV{PH_OLDPIG}"
     , 'hcatbin'          => "$ENV{HCAT_BIN}"

Modified: pig/branches/spark/test/e2e/pig/conf/local.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/local.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/local.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/local.conf Thu Nov 27 12:49:54 2014
@@ -15,7 +15,7 @@
 #  limitations under the License.                                                      
                                                                                        
 my $me = `whoami`;
-chomp $me;
+$me =~ s/[^a-zA-Z0-9]*//g;
 
 # The contents of this file can be rewritten to fit your installation.
 # Also, you can define the following environment variables and set things up as in the test setup
@@ -40,13 +40,13 @@ $cfg = {
     #TEST
     , 'benchmarkPath'    => "$ENV{PH_OUT}/benchmarks"
     , 'scriptPath'       => "$ENV{PH_ROOT}/libexec"
-    , 'tmpPath'          => '/tmp/pigtest'
+    , 'tmpPath'          => 'tmp/pigtest'
 
     #PIG
     , 'testconfigpath'   => "$ENV{PH_CLUSTER}/conf/"
     , 'funcjarPath'      => "$ENV{PH_ROOT}/lib/java"
     , 'paramPath'        => "$ENV{PH_ROOT}/paramfiles"
-    , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java"
+    , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}"
     , 'pigpath'          => "$ENV{PH_PIG}"
 	, 'oldpigpath'       => "$ENV{PH_OLDPIG}"
     , 'exectype'         => 'local'

Modified: pig/branches/spark/test/e2e/pig/conf/rpm.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/rpm.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/rpm.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/rpm.conf Thu Nov 27 12:49:54 2014
@@ -15,7 +15,7 @@
 #  limitations under the License.                                                      
                                                                                        
 my $me = `whoami`;
-chomp $me;
+$me =~ s/[^a-zA-Z0-9]*//g;
 
 # The contents of this file can be rewritten to fit your installation.
 # Also, you can define the following environment variables and set things up as in the test setup
@@ -43,14 +43,13 @@ $cfg = {
     #TEST
     , 'benchmarkPath'    => "$ENV{PH_OUT}/benchmarks"
     , 'scriptPath'       => "$ENV{PH_ROOT}/libexec"
-    , 'tmpPath'          => '/tmp/pigtest'
+    , 'tmpPath'          => 'tmp/pigtest'
 
     #PIG
     , 'testconfigpath'   => "$ENV{HADOOP_CONF_DIR}"
     , 'funcjarPath'      => "$ENV{PH_ROOT}/lib/java"
-    , 'piggybankjarPath' => "/usr/lib/pig"
+    , 'piggybankjarPath' => "/usr/lib/pig/lib/piggybank.jar"
     , 'paramPath'        => "$ENV{PH_ROOT}/paramfiles"
-    , 'piggybankjarPath' => "/usr/lib/pig"
     , 'pigpath'          => "/usr"
     , 'oldpigpath'       => "$ENV{PH_OLDPIG}"
     , 'hcatbin'          => "$ENV{HCAT_BIN}"

Modified: pig/branches/spark/test/e2e/pig/conf/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/tez.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/tez.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/tez.conf Thu Nov 27 12:49:54 2014
@@ -15,7 +15,7 @@
 #  limitations under the License.                                                      
                                                                                        
 my $me = `whoami`;
-chomp $me;
+$me =~ s/[^a-zA-Z0-9]*//g;
 
 # The contents of this file can be rewritten to fit your installation.
 # Also, you can define the following environment variables and set things up as in the test setup
@@ -43,13 +43,13 @@ $cfg = {
     #TEST
     , 'benchmarkPath'    => "$ENV{PH_OUT}/benchmarks"
     , 'scriptPath'       => "$ENV{PH_ROOT}/libexec"
-    , 'tmpPath'          => '/tmp/pigtest'
+    , 'tmpPath'          => 'tmp/pigtest'
 
     #PIG
     , 'testconfigpath'   => "$ENV{HADOOP_CONF_DIR}"
     , 'funcjarPath'      => "$ENV{PH_ROOT}/lib/java"
     , 'paramPath'        => "$ENV{PH_ROOT}/paramfiles"
-    , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java"
+    , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}"
     , 'pigpath'          => "$ENV{PH_PIG}"
     , 'oldpigpath'       => "$ENV{PH_OLDPIG}"
     , 'hcatbin'          => "$ENV{HCAT_BIN}"

Modified: pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm (original)
+++ pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm Thu Nov 27 12:49:54 2014
@@ -120,11 +120,6 @@ sub generateData
             'rows' => 10000,
             'outfile' => "singlefile/studenttab10k",
         }, {
-            'name' => "studenttab20m",
-            'filetype' => "studenttab",
-            'rows' => 20000000,
-            'outfile' => "singlefile/studenttab20m",
-        }, {
             'name' => "votertab10k",
             'filetype' => "votertab",
             'rows' => 10000,
@@ -210,11 +205,6 @@ sub generateData
             'rows' => 5000,
             'outfile' => "types/numbers.txt",
         }, {
-            'name' => "biggish",
-            'filetype' => "biggish",
-            'rows' => 1000000,
-            'outfile' => "singlefile/biggish",
-        }, {
             'name' => "prerank",
             'filetype' => "ranking",
             'rows' => 30,

Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Thu Nov 27 12:49:54 2014
@@ -73,7 +73,7 @@ sub replaceParameters
     $cmd =~ s/:INPATH:/$testCmd->{'inpathbase'}/g;
     $cmd =~ s/:OUTPATH:/$outfile/g;
     $cmd =~ s/:FUNCPATH:/$testCmd->{'funcjarPath'}/g;
-    $cmd =~ s/:PIGGYBANKPATH:/$testCmd->{'piggybankjarPath'}/g;
+    $cmd =~ s/:PIGGYBANKJAR:/$testCmd->{'piggybankjarPath'}/g;
     $cmd =~ s/:PIGPATH:/$testCmd->{'pigpath'}/g;
     $cmd =~ s/:RUNID:/$testCmd->{'UID'}/g;
     $cmd =~ s/:USRHOMEPATH:/$testCmd->{'userhomePath'}/g;
@@ -672,6 +672,9 @@ sub generateBenchmark
         if ((Util::isWindows()||Util::isCygwin()) && $testCmd->{'pig_win'}) {
            $modifiedTestCmd{'pig'} = $testCmd->{'pig_win'};
        }
+	   if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
+           $modifiedTestCmd{'pig'} = $testCmd->{'pig23'};
+       }
 		# Change so we're looking at the old version of Pig
                 if (defined $testCmd->{'oldpigpath'} && $testCmd->{'oldpigpath'} ne "") {
 		    $modifiedTestCmd{'pigpath'} = $testCmd->{'oldpigpath'};

Modified: pig/branches/spark/test/e2e/pig/tests/bigdata.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/bigdata.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/bigdata.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/bigdata.conf Thu Nov 27 12:49:54 2014
@@ -28,11 +28,12 @@ $cfg = {
 
 	'groups' => [
 		{
-		'name' => 'BigData',
+		'name' => 'BigData_Checkin',
 		'tests' => [
 			{
 			'num' => 1,
             ,'floatpostprocess' => 1
+            ,'java_params' => ['-Dpig.tez.auto.parallelism=false']
             ,'delimiter' => '	',
 			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
@@ -48,18 +49,6 @@ store i into ':OUTPATH:';\,
 			{
 			'num' => 2,
 			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
-a1 = filter a by age < '50';
-b = group a1 by (name, age);
-c = foreach b generate group as g, AVG(a1.gpa);
-d = filter c by $1 > 3.0;
-d1 = foreach d generate g.$0 as name, g.$1 as age, $1 as gpa; 
-e = group d1 by name;
-f = foreach e generate group, AVG(d1.age);
-store f into ':OUTPATH:';\,
-			},
-			{
-			'num' => 3,
-			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
 c = foreach a generate name;
 d = foreach b generate name;
@@ -68,57 +57,84 @@ f = union d, e;
 g = distinct f parallel 20;
 store g into ':OUTPATH:';\,
 			},
-			{
-			'num' => 4,
-			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+		]
+		},
+		{
+        'name' => 'BigData_Group',
+        'tests' => [
+            {
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+a1 = filter a by age < '50';
+b = group a1 by (name, age);
+c = foreach b generate group as g, AVG(a1.gpa);
+d = filter c by $1 > 3.0;
+d1 = foreach d generate g.$0 as name, g.$1 as age, $1 as gpa; 
+e = group d1 by name;
+f = foreach e generate group, AVG(d1.age);
+store f into ':OUTPATH:';\,
+            },
+            {
+            'num' => 2,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 b = group a all parallel 20;
 c = foreach b generate COUNT(a.$0);
 store c into ':OUTPATH:';\,
-			},
-            		{
-			'num' => 5,
-			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+            },
+            {
+            'num' => 3,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true'],
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 b = group a by name parallel 20;
 c = foreach b generate group, COUNT($1);
 store c into ':OUTPATH:';\,
-			},
-            		{
-			'num' => 6,
-			'pig' => q\
+            },
+        ]
+        },
+        {
+        'name' => 'BigData_Stream',
+        'tests' => [
+            {
+            'num' => 1,
+            'pig' => q\
 define cmd `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD' limit 3);
 a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 b = stream a through cmd as (n, a, g);
 c = foreach b generate n, a;
 store c into ':OUTPATH:';\,
-			},
-			{
-			'num' => 7,
-			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+            },
+        ]
+        },
+        {
+        'name' => 'BigData_Order',
+        'tests' => [
+            {
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 b = order a by name parallel 20;
 store b into ':OUTPATH:';\,
-			'sortArgs' => ['-t', '	', '-k', '1,1'],
-			},
-			{
-			'num' => 8,
-			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
+            'sortArgs' => ['-t', '	', '-k', '1,1'],
+            },
+            {
+            'num' => 2,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
 b = order a by name, age desc parallel 20;
 store b into ':OUTPATH:';\,
-			'sortArgs' => ['-t', '	', '-k', '1,1', '-k', '2nr,2nr'],
-			},
-			{
-			'num' => 9,
-			'pig' => q\A = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+            'sortArgs' => ['-t', '	', '-k', '1,1', '-k', '2nr,2nr'],
+            },
+            {
+            'num' => 3,
+            'pig' => q\A = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 B = filter A by age > 20;
 C = group B by name;
 D = foreach C generate group, COUNT(B) PARALLEL 16;
 E = order D by $0 PARALLEL 16;
 F = limit E 10;
 store F into ':OUTPATH:';\,
-			'sortArgs' => ['-t', '	', '-k', '1,1'],
-			},
-
-		]
-		},
+            'sortArgs' => ['-t', '	', '-k', '1,1'],
+            },
+        ]
+        },
 	]
 }
 ;

Modified: pig/branches/spark/test/e2e/pig/tests/grunt.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/grunt.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/grunt.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/grunt.conf Thu Nov 27 12:49:54 2014
@@ -44,7 +44,7 @@ $cfg = {
                         'num' => 2,
                         'pig' => "pwd",
                         'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode
-                        'expected_out_regex' => "hdfs:",
+                        'expected_out_regex' => "/user",
                         'rc' => 0
 
                       },{
@@ -55,10 +55,8 @@ $cfg = {
 
                       },{
                         'num' => 6,
-                        'pig' => q\
-sh touch /bin/bad
-\,
-                          ,'expected_err_regex' => "Permission denied"
+                        'pig' => "cat nonexist"
+                          ,'expected_err_regex' => "does not exist"
                           ,'rc' => 5
 
                         },{

Modified: pig/branches/spark/test/e2e/pig/tests/macro.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/macro.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/macro.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/macro.conf Thu Nov 27 12:49:54 2014
@@ -373,10 +373,16 @@ $cfg = {
                       A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
                       x = test(A);
                       store x into ':OUTPATH:';#,
+          'pig_win' => q#define CMD `perl -ne "print $_;"`;
+                      define test(in) returns B {
+                          $B = stream $in through CMD as (name, age, gpa);
+                      }
+
+                      A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+                      x = test(A);
+                      store x into ':OUTPATH:';#,
           'verify_pig_script' => q#A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-                                   define CMD `perl -ne 'print $_;'`;
-                                   B = stream A through CMD as (name, age, gpa);
-                                   store B into ':OUTPATH:';#,
+                                   store A into ':OUTPATH:';#,
           'floatpostprocess' => 1,
           'delimiter' => '  ' 
         },

Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Thu Nov 27 12:49:54 2014
@@ -236,6 +236,14 @@ $cfg = {
                         C = stream B through CMD2 as (name, age, gpa);
                         D = JOIN B by name, C by name;
                         store D into ':OUTPATH:.2'; #,
+            'pig_win' => q# define CMD1 `perl -ne "print $_;"`;
+                        define CMD2 `perl -ne "print $_;"`;
+                        A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+                        B = stream A through CMD1 as (name, age, gpa);
+                        store B into ':OUTPATH:.1';
+                        C = stream B through CMD2 as (name, age, gpa);
+                        D = JOIN B by name, C by name;
+                        store D into ':OUTPATH:.2'; #,
             'sql' => "select name, age, gpa from studenttab10k;
                       select A.name, A.age, A.gpa, B.name, B.age, B.gpa 
                       from studenttab10k as A join studenttab10k as B using(name);",