You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC

svn commit: r1783988 [14/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...

Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Wed Feb 22 09:43:41 2017
@@ -27,6 +27,8 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -40,20 +42,26 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.validator.BlackAndWhitelistFilter;
 import org.apache.pig.validator.PigCommandFilter;
-import org.python.google.common.base.Preconditions;
 
 public class PreprocessorContext {
 
-    private Map<String, String> param_val;
+    private int tableinitsize = 10;
+    private Deque<Map<String,String>> param_val_stack;
 
-    // used internally to detect when a param is set multiple times,
-    // but it set with the same value so it's ok not to log a warning
-    private Map<String, String> param_source;
-    
     private PigContext pigContext;
 
     public Map<String, String> getParamVal() {
-        return param_val;
+        Map <String, String> ret = new Hashtable <String, String>(tableinitsize);
+
+        //stack (deque) iterates LIFO
+        for (Map <String, String> map : param_val_stack ) {
+            for (Map.Entry<String, String> entry : map.entrySet()) {
+                if( ! ret.containsKey(entry.getKey()) ) {
+                    ret.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        return ret;
     }
 
     private final Log log = LogFactory.getLog(getClass());
@@ -63,24 +71,15 @@ public class PreprocessorContext {
      *                smaller number only impacts performance
      */
     public PreprocessorContext(int limit) {
-        param_val = new Hashtable<String, String> (limit);
-        param_source = new Hashtable<String, String> (limit);
-    }
-
-    public PreprocessorContext(Map<String, String> paramVal) {
-        param_val = paramVal;
-        param_source = new Hashtable<String, String>(paramVal);
+        tableinitsize = limit;
+        param_val_stack = new ArrayDeque<Map<String,String>> ();
+        param_val_stack.push(new Hashtable<String, String> (tableinitsize));
     }
 
     public void setPigContext(PigContext context) {
         this.pigContext = context;
     }
 
-    /*
-    public  void processLiteral(String key, String val) {
-        processLiteral(key, val, true);
-    } */
-
     /**
      * This method generates parameter value by running specified command
      *
@@ -102,20 +101,35 @@ public class PreprocessorContext {
         processOrdLine(key, val, true);
     }
 
-    /*
-    public  void processLiteral(String key, String val, Boolean overwrite) {
+    public void paramScopePush() {
+        param_val_stack.push( new Hashtable<String, String> (tableinitsize) );
+    }
 
-        if (param_val.containsKey(key)) {
-            if (overwrite) {
-                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
-            } else {
-                return;
+    public void paramScopePop() {
+        param_val_stack.pop();
+    }
+
+    public boolean paramval_containsKey(String key) {
+        for (Map <String, String> map : param_val_stack ) {
+            if( map.containsKey(key) ) {
+                return true;
             }
         }
+        return false;
+    }
 
-        String sub_val = substitute(val);
-        param_val.put(key, sub_val);
-    } */
+    public String paramval_get(String key) {
+        for (Map <String, String> map : param_val_stack ) {
+            if( map.containsKey(key) ) {
+                return map.get(key);
+            }
+        }
+        return null;
+    }
+
+    public void paramval_put(String key, String value) {
+        param_val_stack.peek().put(key, value);
+    }
 
     /**
      * This method generates parameter value by running specified command
@@ -129,21 +143,21 @@ public class PreprocessorContext {
             filter.validate(PigCommandFilter.Command.SH);
         }
 
-        if (param_val.containsKey(key)) {
-            if (param_source.get(key).equals(val) || !overwrite) {
-                return;
-            } else {
-                log.warn("Warning : Multiple values found for " + key
-                        + ". Using value " + val);
-            }
+        if (paramval_containsKey(key) && !overwrite) {
+            return;
         }
 
-        param_source.put(key, val);
-
         val = val.substring(1, val.length()-1); //to remove the backticks
         String sub_val = substitute(val);
         sub_val = executeShellCommand(sub_val);
-        param_val.put(key, sub_val);
+
+        if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) {
+            //(boolean overwrite is always true here)
+            log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. "
+                     + "Previous value " + paramval_get(key) + ", now using value " + sub_val);
+        }
+
+        paramval_put(key, sub_val);
     }
 
     public void validate(String preprocessorCmd) throws FrontendException {
@@ -175,18 +189,18 @@ public class PreprocessorContext {
      */
     public  void processOrdLine(String key, String val, Boolean overwrite)  throws ParameterSubstitutionException {
 
-        if (param_val.containsKey(key)) {
-            if (param_source.get(key).equals(val) || !overwrite) {
+        String sub_val = substitute(val, key);
+        if (paramval_containsKey(key)) {
+            if (paramval_get(key).equals(sub_val) || !overwrite) {
                 return;
             } else {
-                log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
+                log.warn("Warning : Multiple values found for " + key
+                         + ". Previous value " + paramval_get(key)
+                         + ", now using value " + sub_val);
             }
         }
 
-        param_source.put(key, val);
-
-        String sub_val = substitute(val, key);
-        param_val.put(key, sub_val);
+        paramval_put(key, sub_val);
     }
 
 
@@ -318,7 +332,7 @@ public class PreprocessorContext {
         while (bracketKeyMatcher.find()) {
             if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) {
                 key = bracketKeyMatcher.group(1);
-                if (!(param_val.containsKey(key))) {
+                if (!(paramval_containsKey(key))) {
                     String message;
                     if (parentKey == null) {
                         message = "Undefined parameter : " + key;
@@ -327,7 +341,7 @@ public class PreprocessorContext {
                     }
                     throw new ParameterSubstitutionException(message);
                 }
-                val = param_val.get(key);
+                val = paramval_get(key);
                 if (val.contains("$")) {
                     val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
                 }
@@ -345,7 +359,7 @@ public class PreprocessorContext {
             // for escaped vars of the form \$<id>
             if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) {
                 key = keyMatcher.group(1);
-                if (!(param_val.containsKey(key))) {
+                if (!(paramval_containsKey(key))) {
                     String message;
                     if (parentKey == null) {
                         message = "Undefined parameter : " + key;
@@ -354,7 +368,7 @@ public class PreprocessorContext {
                     }
                     throw new ParameterSubstitutionException(message);
                 }
-                val = param_val.get(key);
+                val = paramval_get(key);
                 if (val.contains("$")) {
                     val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
                 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Wed Feb 22 09:43:41 2017
@@ -23,6 +23,7 @@ options {
   STATIC = false;
   // Case is ignored in keywords
   IGNORE_CASE = true;
+  // DEBUG_PARSER = true;
   JAVA_UNICODE_ESCAPE = true;
 }
 
@@ -36,7 +37,7 @@ import java.util.List;
 import java.util.ArrayList;
 import org.apache.pig.impl.util.StringUtils;
 
-import jline.ConsoleReader;
+import jline.console.ConsoleReader;
 
 public abstract class PigScriptParser
 {
@@ -217,7 +218,7 @@ TOKEN_MGR_DECLS : {
 		{
 			/*System.err.print(">> ");
 			System.err.flush();*/
-		    consoleReader.setDefaultPrompt(">> ");
+		    consoleReader.setPrompt(">> ");
 		}
 	}
 
@@ -267,7 +268,7 @@ TOKEN_MGR_DECLS : {
 	<"'"> {prevState = PIG_START;} : IN_STRING
 |	<"`"> {prevState = PIG_START;} : IN_COMMAND
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+ > {prevState = PIG_START;} : GENERATE
 |       <"{"> {pigBlockLevel = 1;} : IN_BLOCK
 |       <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);}
 |       <";"> : PIG_END
@@ -292,7 +293,8 @@ TOKEN_MGR_DECLS : {
 
 <IN_STRING> MORE :
 {
-	<"\\'">
+	<"\\\\">
+|	<"\\'">
 |	<"'"> { SwitchTo(prevState);}
 |	<("\n" | "\r" | "\r\n")> {secondary_prompt();}
 |	<(~[])>
@@ -395,7 +397,7 @@ TOKEN_MGR_DECLS : {
 {
 	<"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING
 |	<(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION
-|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE
+|   <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE
 |	<"{"> {pigBlockLevel++;}
 |       <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);}
 |	<"'"> {prevState = IN_BLOCK;} : IN_STRING

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java Wed Feb 22 09:43:41 2017
@@ -147,6 +147,11 @@ final class EmbeddedPigStats extends Pig
     }
 
     @Override
+    public String getDisplayString() {
+        return null;
+    }
+
+    @Override
     public long getProactiveSpillCountObjects() {
         throw new UnsupportedOperationException();
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java Wed Feb 22 09:43:41 2017
@@ -87,6 +87,11 @@ public class EmptyPigStats extends PigSt
     }
 
     @Override
+    public String getDisplayString() {
+        return null;
+    }
+
+    @Override
     public JobGraph getJobGraph() {
        return emptyJobPlan;
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java Wed Feb 22 09:43:41 2017
@@ -134,6 +134,11 @@ public abstract class PigStats {
     }
 
     /**
+     * Returns the display message in pig grunt
+     */
+    public abstract String getDisplayString();
+
+    /**
      * Returns the DAG of jobs spawned by the script
      */
     public JobGraph getJobGraph() {
@@ -265,6 +270,13 @@ public abstract class PigStats {
         return ScriptState.get().getPigVersion();
     }
 
+    /**
+     *  Returns the contents of the script that was run.
+     */
+    public String getScript() {
+        return ScriptState.get().getScript();
+    }
+
     public String getScriptId() {
         return ScriptState.get().getId();
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed Feb 22 09:43:41 2017
@@ -24,7 +24,7 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
 
 /**
@@ -71,7 +71,7 @@ public class PigStatsUtil {
      */
     @Deprecated
     public static final String FS_COUNTER_GROUP
-            = HadoopShims.getFsCounterGroupName();
+            = MRPigStatsUtil.FS_COUNTER_GROUP;
 
     /**
      * Returns an empty PigStats object Use of this method is not advised as it

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Wed Feb 22 09:43:41 2017
@@ -133,6 +133,8 @@ public abstract class ScriptState {
         MERGE_SPARSE_JOIN,
         REPLICATED_JOIN,
         SKEWED_JOIN,
+        BUILD_BLOOM,
+        FILTER_BLOOM,
         HASH_JOIN,
         COLLECTED_GROUP,
         MERGE_COGROUP,
@@ -312,7 +314,7 @@ public abstract class ScriptState {
                 maxScriptSize = Integer.valueOf(prop);
             }
         }
-       
+
         this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize)
                                                    : script;
 
@@ -485,6 +487,10 @@ public abstract class ScriptState {
         public void visit(LOJoin op) {
             if (op.getJoinType() == JOINTYPE.HASH) {
                 feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+            } else if (op.getJoinType() == JOINTYPE.BLOOM) {
+                feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+                feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+                feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
             } else if (op.getJoinType() == JOINTYPE.MERGE) {
                 feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
             } else if (op.getJoinType() == JOINTYPE.MERGESPARSE) {
@@ -506,6 +512,7 @@ public abstract class ScriptState {
             feature.set(PIG_FEATURE.RANK.ordinal());
         }
 
+        @Override
         public void visit(LOSort op) {
             feature.set(PIG_FEATURE.ORDER_BY.ordinal());
         }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Wed Feb 22 09:43:41 2017
@@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.impl.io.FileSpec;
@@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
 
+import org.python.google.common.collect.Lists;
+
 
 /**
  * This class encapsulates the runtime statistics of a MapReduce job.
@@ -281,7 +284,7 @@ public final class MRJobStats extends Jo
 
     void addCounters(Job job) {
         try {
-            counters = HadoopShims.getCounters(job);
+            counters = getCounters(job);
         } catch (IOException e) {
             LOG.warn("Unable to get job counters", e);
         }
@@ -349,13 +352,13 @@ public final class MRJobStats extends Jo
     void addMapReduceStatistics(Job job) {
         Iterator<TaskReport> maps = null;
         try {
-            maps = HadoopShims.getTaskReports(job, TaskType.MAP);
+            maps = getTaskReports(job, TaskType.MAP);
         } catch (IOException e) {
             LOG.warn("Failed to get map task report", e);
         }
         Iterator<TaskReport> reduces = null;
         try {
-            reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+            reduces = getTaskReports(job, TaskType.REDUCE);
         } catch (IOException e) {
             LOG.warn("Failed to get reduce task report", e);
         }
@@ -515,4 +518,35 @@ public final class MRJobStats extends Jo
         inputs.add(is);
     }
 
+    public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
+        if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
+            LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
+            return null;
+        }
+        Cluster cluster = new Cluster(job.getJobConf());
+        try {
+            org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+            if (mrJob == null) { // In local mode, mrJob will be null
+                mrJob = job.getJob();
+            }
+            org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
+            return Lists.newArrayList(reports).iterator();
+        } catch (InterruptedException ir) {
+            throw new IOException(ir);
+        }
+    }
+
+    public static Counters getCounters(Job job) throws IOException {
+        try {
+            Cluster cluster = new Cluster(job.getJobConf());
+            org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+            if (mrJob == null) { // In local mode, mrJob will be null
+                mrJob = job.getJob();
+            }
+            return new Counters(mrJob.getCounters());
+        } catch (Exception ir) {
+            throw new IOException(ir);
+        }
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Wed Feb 22 09:43:41 2017
@@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.classification.InterfaceAudience.Private;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.JobStats;
@@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS
     public static final String TASK_COUNTER_GROUP
             = "org.apache.hadoop.mapred.Task$Counter";
     public static final String FS_COUNTER_GROUP
-            = HadoopShims.getFsCounterGroupName();
+            = "org.apache.hadoop.mapreduce.FileSystemCounter";
 
     private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
 

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Wed Feb 22 09:43:41 2017
@@ -207,13 +207,18 @@ public final class SimplePigStats extend
     }
 
     void display() {
+        LOG.info(getDisplayString());
+    }
+
+    @Override
+    public String getDisplayString() {
         if (returnCode == ReturnCode.UNKNOWN) {
             LOG.warn("unknown return code, can't display the results");
-            return;
+            return "";
         }
         if (pigContext == null) {
             LOG.warn("unknown exec type, don't display the results");
-            return;
+            return "";
         }
 
         SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
@@ -276,7 +281,7 @@ public final class SimplePigStats extend
 
         sb.append("\nJob DAG:\n").append(jobPlan.toString());
 
-        LOG.info("Script Statistics: \n" + sb.toString());
+        return "Script Statistics: \n" + sb.toString();
     }
 
     void mapMROperToJob(MapReduceOper mro, Job job) {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Feb 22 09:43:41 2017
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,7 +39,6 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -115,29 +113,35 @@ public class SparkPigStats extends PigSt
     }
 
     private void display() {
+        LOG.info(getDisplayString());
+    }
+
+    public String getDisplayString() {
+        StringBuilder sb = new StringBuilder();
         Iterator<JobStats> iter = jobPlan.iterator();
         while (iter.hasNext()) {
-            SparkJobStats js = (SparkJobStats)iter.next();
+            SparkJobStats js = (SparkJobStats) iter.next();
             if (jobSparkOperatorMap.containsKey(js)) {
                 SparkOperator sparkOperator = jobSparkOperatorMap.get(js);
                 js.setAlias(sparkOperator);
             }
-            LOG.info( "Spark Job [" + js.getJobId() + "] Metrics");
+            sb.append("Spark Job [" + js.getJobId() + "] Metrics");
             Map<String, Long> stats = js.getStats();
             if (stats == null) {
-                LOG.info("No statistics found for job " + js.getJobId());
-                return;
+                sb.append("No statistics found for job " + js.getJobId());
+                return sb.toString();
             }
 
             Iterator statIt = stats.entrySet().iterator();
             while (statIt.hasNext()) {
-                Map.Entry pairs = (Map.Entry)statIt.next();
-                LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue());
+                Map.Entry pairs = (Map.Entry) statIt.next();
+                sb.append("\t" + pairs.getKey() + " : " + pairs.getValue());
             }
-            for (InputStats inputStat : js.getInputs()){
-                LOG.info("\t"+inputStat.getDisplayString());
+            for (InputStats inputStat : js.getInputs()) {
+                sb.append("\t" + inputStat.getDisplayString());
             }
         }
+        return sb.toString();
     }
 
     @Override
@@ -217,4 +221,4 @@ public class SparkPigStats extends PigSt
         sparkOperatorsSet.add(sparkOperator);
     }
 
-}
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Feb 22 09:43:41 2017
@@ -18,12 +18,8 @@
 
 package org.apache.pig.tools.pigstats.spark;
 
-import java.util.List;
-
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
@@ -101,21 +97,7 @@ public class SparkStatsUtil {
 
     public static long getLoadSparkCounterValue(POLoad load) {
         SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
-        int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
-        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount;
-    }
-
-    private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){
-        List<PhysicalOperator> successors = pp.getSuccessors(op);
-        if (successors == null || successors.size()==0) return 1;
-        for (PhysicalOperator successor : successors){
-            if (successor instanceof POSplit){
-                return ((POSplit)successor).getPlans().size();
-            }else{
-                return countCoLoadsIfInSplit(successor,pp);
-            }
-        }
-        return 1;
+        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load));
     }
 
     public static boolean isJobSuccess(int jobID,

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Wed Feb 22 09:43:41 2017
@@ -245,7 +245,11 @@ public class TezDAGStats extends JobStat
                             OutputStats existingOut = outputsByLocation.get(output.getLocation());
                             // In case of multistore, bytesWritten is already calculated
                             // from size of all the files in the output directory.
-                            if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) {
+                            // So use that if there is a combination of multistore and single store
+                            if (output.getPOStore().isMultiStore()) {
+                                existingOut.setBytes(output.getBytes());
+                                existingOut.setPOStore(output.getPOStore());
+                            } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) {
                                 long bytes = existingOut.getBytes() > -1
                                         ? (existingOut.getBytes() + output.getBytes())
                                         : output.getBytes();

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Wed Feb 22 09:43:41 2017
@@ -117,6 +117,11 @@ public class TezPigScriptStats extends P
     }
 
     private void display() {
+        LOG.info(getDisplayString());
+    }
+
+    @Override
+    public String getDisplayString() {
         SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
         StringBuilder sb = new StringBuilder();
         sb.append("\n");
@@ -170,7 +175,7 @@ public class TezPigScriptStats extends P
         for (OutputStats os : getOutputStats()) {
             sb.append(os.getDisplayString().trim()).append("\n");
         }
-        LOG.info("Script Statistics:\n" + sb.toString());
+        return "Script Statistics:\n" + sb.toString();
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Wed Feb 22 09:43:41 2017
@@ -275,6 +275,12 @@ public class TezScriptState extends Scri
                 if (tezOp.isRegularJoin()) {
                     feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
                 }
+                if (tezOp.isBuildBloom()) {
+                    feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+                }
+                if (tezOp.isFilterBloom()) {
+                    feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
+                }
                 if (tezOp.isUnion()) {
                     feature.set(PIG_FEATURE.UNION.ordinal());
                 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Feb 22 09:43:41 2017
@@ -22,6 +22,7 @@ import static org.apache.pig.tools.pigst
 import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -289,13 +290,19 @@ public class TezVertexStats extends JobS
         }
 
         // Split followed by union will have multiple stores writing to same location
-        Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
+        Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>();
         for (POStore sto : stores) {
             POStoreTez store = (POStoreTez) sto;
-            uniqueOutputs.put(store.getOutputKey(), store);
+            List<POStore> stores = uniqueOutputs.get(store.getOutputKey());
+            if (stores == null) {
+                stores = new ArrayList<POStore>();
+            }
+            stores.add(store);
+            uniqueOutputs.put(store.getOutputKey(), stores);
         }
 
-        for (POStore sto : uniqueOutputs.values()) {
+        for (List<POStore> stores : uniqueOutputs.values()) {
+            POStore sto = stores.get(0);
             if (sto.isTmpStore()) {
                 continue;
             }
@@ -304,11 +311,16 @@ public class TezVertexStats extends JobS
             String filename = sto.getSFile().getFileName();
             if (counters != null) {
                 if (msGroup != null) {
-                    Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
-                    if (n != null) records = n;
-                }
-                if (records == -1) {
-                    records = outputRecords;
+                    long n = 0;
+                    Long val = null;
+                    for (POStore store : stores) {
+                        val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store));
+                        // Tez removes 0 value counters for efficiency.
+                        if (val != null) {
+                            n += val;
+                        };
+                    }
+                    records = n;
                 }
                 if (isSuccessful() && records == -1) {
                     // Tez removes 0 value counters for efficiency.
@@ -338,13 +350,13 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public int getNumberMaps() {
-        return this.isMapOpts ? numTasks : -1;
+        return this.isMapOpts ? numTasks : 0;
     }
 
     @Override
     @Deprecated
     public int getNumberReduces() {
-        return this.isMapOpts ? -1 : numTasks;
+        return this.isMapOpts ? 0 : numTasks;
     }
 
     @Override
@@ -386,25 +398,25 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public long getMapInputRecords() {
-        return this.isMapOpts ? numInputRecords : -1;
+        return this.isMapOpts ? numInputRecords : 0;
     }
 
     @Override
     @Deprecated
     public long getMapOutputRecords() {
-        return this.isMapOpts ? numOutputRecords : -1;
+        return this.isMapOpts ? numOutputRecords : 0;
     }
 
     @Override
     @Deprecated
     public long getReduceInputRecords() {
-        return this.isMapOpts ? -1 : numInputRecords;
+        return numReduceInputRecords;
     }
 
     @Override
     @Deprecated
     public long getReduceOutputRecords() {
-        return this.isMapOpts ? -1 : numOutputRecords;
+        return this.isMapOpts ? 0 : numOutputRecords;
     }
 
     @Override

Modified: pig/branches/spark/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/pig-default.properties?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/pig-default.properties (original)
+++ pig/branches/spark/src/pig-default.properties Wed Feb 22 09:43:41 2017
@@ -61,4 +61,8 @@ pig.stats.output.size.reader.unsupported
 
 pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
 
-pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
+
+pig.ats.enabled=true
+
+pig.tez.configure.am.memory=true

Added: pig/branches/spark/start-build-env.sh
URL: http://svn.apache.org/viewvc/pig/branches/spark/start-build-env.sh?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/start-build-env.sh (added)
+++ pig/branches/spark/start-build-env.sh Wed Feb 22 09:43:41 2017
@@ -0,0 +1,63 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e               # exit on error
+
+cd "$(dirname "$0")" # connect to root
+
+docker build -t pig-build dev-support/docker
+
+if [ "$(uname -s)" == "Linux" ]; then
+  USER_NAME=${SUDO_USER:=${USER}}
+  USER_ID=$(id -u "${USER_NAME}")
+  GROUP_ID=$(id -g "${USER_NAME}")
+else # boot2docker uid and gid
+  USER_NAME=${USER}
+  USER_ID=1000
+  GROUP_ID=50
+fi
+
+docker build -t "pig-build-${USER_NAME}" - <<UserSpecificDocker
+FROM pig-build
+RUN bash configure-for-user.sh ${USER_NAME} ${USER_ID} ${GROUP_ID} "$(fgrep vboxsf /etc/group)"
+UserSpecificDocker
+
+# By mapping the .m2 directory you can do an mvn install from
+# within the container and use the result on your normal
+# system. This also is a significant speedup in subsequent
+# builds because the dependencies are downloaded only once.
+# Same with the .ivy2 directory
+
+DOCKER="docker run --rm=true -t -i"
+DOCKER=${DOCKER}" -u ${USER_NAME}"
+
+# Work in the current directory
+DOCKER=${DOCKER}" -v ${PWD}:/home/${USER_NAME}/pig"
+DOCKER=${DOCKER}" -w /home/${USER_NAME}/pig"
+
+# Mount persistent caching of 'large' downloads
+DOCKER=${DOCKER}" -v ${HOME}/.m2:/home/${USER_NAME}/.m2"
+DOCKER=${DOCKER}" -v ${HOME}/.ivy2:/home/${USER_NAME}/.ivy2"
+
+# What do we run?
+DOCKER=${DOCKER}" --name pig-build-${USER_NAME}-$$"
+DOCKER=${DOCKER}" pig-build-${USER_NAME}"
+DOCKER=${DOCKER}" bash"
+
+# Now actually start it
+${DOCKER}
+

Modified: pig/branches/spark/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/build.xml (original)
+++ pig/branches/spark/test/e2e/pig/build.xml Wed Feb 22 09:43:41 2017
@@ -27,9 +27,8 @@
   <property name="hive.lib.dir"
 	value="${pig.base.dir}/build/ivy/lib/Pig"/>
 
-  <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S">
-    <equals arg1="${hadoopversion}" arg2="23" />
-  </condition>
+  <property name="hadoopversion" value="2" />
+  <property name="hive.hadoop.shims.version" value="0.23" />
 
   <property name="mvnrepo" value="http://repo2.maven.org/maven2"/>
 
@@ -61,6 +60,7 @@
   <property name="harness.PH_LOCAL" value="."/>
   <property name="harness.PH_OUT" value="."/>
   <property name="harness.PERL5LIB" value="./libexec"/>
+  <property name="harness.user.home" value="/user/pig" />
 
   <property name="test.location" value="${basedir}/testdist"/>
   <property name="benchmark.location" value="${test.location}/benchmarks"/>
@@ -137,6 +137,7 @@
       <path path="${test.location}/tests/multiquery.conf"/>
       <path path="${test.location}/tests/negative.conf"/>
       <path path="${test.location}/tests/nightly.conf"/>
+      <path path="${test.location}/tests/join.conf"/>
       <path path="${test.location}/tests/streaming.conf"/>
       <path path="${test.location}/tests/streaming_local.conf"/>
       <path path="${test.location}/tests/turing_jython.conf"/>
@@ -309,6 +310,7 @@
       <env key="PH_HIVE_LIB_DIR" value="${hive.lib.dir}"/>
       <env key="PH_HIVE_VERSION" value="${hive.version}"/>
       <env key="PH_HIVE_SHIMS_VERSION" value="${hive.hadoop.shims.version}"/>
+      <env key="PH_HDFS_BASE" value="${harness.user.home}" />
       <env key="HARNESS_CONF" value="${harness.conf.file}"/>
       <env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
       <env key="HADOOP_PREFIX" value="${HADOOP_PREFIX}"/>
@@ -369,6 +371,7 @@
       <env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/>
       <env key="HARNESS_CONF" value="${harness.conf.file}"/>
       <env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
+      <env key="PH_HDFS_BASE" value="${harness.user.home}" />
 
       <arg value="./test_harness.pl"/>
       <arg value="-deploycfg"/>

Modified: pig/branches/spark/test/e2e/pig/conf/spark.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/spark.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/spark.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/spark.conf Wed Feb 22 09:43:41 2017
@@ -30,8 +30,8 @@ my $hdfsBase = $ENV{PH_HDFS_BASE} || "/u
 
 $cfg = {
     #HDFS
-      'inpathbase'     => "$ENV{PH_ROOT}/data"
-    , 'outpathbase'    => "$ENV{PH_ROOT}/testout"
+      'inpathbase'     => "$hdfsBase/test/data"
+    , 'outpathbase'    => "$hdfsBase/out"
 
    #LOCAL
     , 'localinpathbase'   => "$ENV{PH_LOCAL}/in"
@@ -55,7 +55,7 @@ $cfg = {
     , 'hcatbin'          => "$ENV{HCAT_BIN}"
     , 'usePython'        => "$ENV{PIG_USE_PYTHON}"
     , 'exectype'         => 'spark'
-    , 'benchmark_exectype'         => 'local'
+    , 'benchmark_exectype'         => 'mapred'
 
     #HADOOP
     , 'mapredjars'       => "$ENV{PH_ROOT}/lib"

Modified: pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original)
+++ pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm Wed Feb 22 09:43:41 2017
@@ -231,11 +231,6 @@ sub generateData
             'rows' => 5000,
             'hdfs' => "types/numbers.txt",
         }, {
-            'name' => "biggish",
-            'filetype' => "biggish",
-            'rows' => 1000000,
-            'hdfs' => "singlefile/biggish",
-        }, {
             'name' => "prerank",
             'filetype' => "ranking",
             'rows' => 30,

Modified: pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm (original)
+++ pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm Wed Feb 22 09:43:41 2017
@@ -209,11 +209,21 @@ sub generateData
             'filetype' => "ranking",
             'rows' => 30,
             'outfile' => "singlefile/prerank",
+        }, {
+            'name' => "utf8Voter",
+            'filetype' => "utf8Voter",
+            'rows' => 30,
+            'outfile' => "utf8Data/选民/utf8Voter",
+        }, {
+            'name' => "utf8Student",
+            'filetype' => "utf8Student",
+            'rows' => 300,
+            'outfile' => "utf8Data/学生/utf8Student",
         }
     );
 
 	# Create the target directories
-    for my $dir ("singlefile", "dir", "types", "glob/star/somegood",
+    for my $dir ("singlefile", "utf8Data/选民", "utf8Data/学生", "dir", "types", "glob/star/somegood",
             "glob/star/moregood", "glob/star/bad") {
         my @cmd = ("mkdir", "-p", "$cfg->{'inpathbase'}/$dir");
 	    $self->runCmd($log, \@cmd);

Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Wed Feb 22 09:43:41 2017
@@ -211,13 +211,6 @@ sub runTest
            $testCmd->{'pig'} = $testCmd->{'pig_win'};
        }
 
-       if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
-           $oldpig = $testCmd->{'pig'};
-           $testCmd->{'pig'} = $testCmd->{'pig23'};
-       }
-       if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'expected_err_regex23'}) {
-           $testCmd->{'expected_err_regex'} = $testCmd->{'expected_err_regex23'};
-       }
        my $res = $self->runPigCmdLine( $testCmd, $log, 1, $resources );
        if ($oldpig) {
            $testCmd->{'pig'} = $oldpig;
@@ -231,10 +224,6 @@ sub runTest
            $testCmd->{'pig'} = $testCmd->{'pig_win'};
        }
 
-       if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
-           $oldpig = $testCmd->{'pig'};
-           $testCmd->{'pig'} = $testCmd->{'pig23'};
-       }
        my $res = $self->runPig( $testCmd, $log, 1, $resources );
        if ($oldpig) {
            $testCmd->{'pig'} = $oldpig;
@@ -686,9 +675,6 @@ sub generateBenchmark
         if ((Util::isWindows()||Util::isCygwin()) && $testCmd->{'pig_win'}) {
            $modifiedTestCmd{'pig'} = $testCmd->{'pig_win'};
        }
-	   if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
-           $modifiedTestCmd{'pig'} = $testCmd->{'pig23'};
-       }
 		# Change so we're looking at the old version of Pig
                 if (defined $testCmd->{'oldpigpath'} && $testCmd->{'oldpigpath'} ne "") {
 		    $modifiedTestCmd{'pigpath'} = $testCmd->{'oldpigpath'};
@@ -1058,10 +1044,6 @@ sub wrongExecutionMode($$)
         }
     }
 
-    if (defined $testCmd->{'ignore23'} && $testCmd->{'hadoopversion'}=='23') {
-        $wrong = 1;
-    }
-
     if ($wrong) {
         print $log "Skipping test $testCmd->{'group'}" . "_" .
             $testCmd->{'num'} . " since it is not suppsed to be run in hadoop 23\n";

Modified: pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl (original)
+++ pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl Wed Feb 22 09:43:41 2017
@@ -73,7 +73,7 @@ while (<$input_handle>)
 {
 	chomp;	
 	$data = $_;
-	if (defined(%hash) && (exists $hash{$data}))
+	if (exists $hash{$data})
 	{
 		print $output_handle "$hash{$data}\n";		
 	}

Modified: pig/branches/spark/test/e2e/pig/tests/grunt.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/grunt.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/grunt.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/grunt.conf Wed Feb 22 09:43:41 2017
@@ -46,7 +46,12 @@ $cfg = {
                         'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode
                         'expected_out_regex' => "/user",
                         'rc' => 0
-
+                      },{
+                        'num' => 3,
+                        'pig' => "ls .",
+                        'execonly' => 'mapred,tez',
+                        'expected_out_regex' => "/user",
+                        'rc' => 0
                       },{
                         'num' => 4,
                         'pig' => "ls :INPATH:",
@@ -77,21 +82,22 @@ $cfg = {
                             'rc' => 0
                         },{
                             'num' => 10,
-                            'pig' => "cp :INPATH:/singlefile/studenttab10k .
-                                      ls .",
+                            'pig' => "mkdir :OUTPATH:
+                                      cp :INPATH:/singlefile/studenttab10k :OUTPATH:
+                                      ls :OUTPATH:",
                             'expected_out_regex' => ".*studenttab10k",
                             'rc' => 0
                         },{
                             'num' => 11,
-                            'pig' => "cp :INPATH:/singlefile/studenttab10k ./fred
-                                      ls .",
+                            'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred
+                                      ls :OUTPATH:",
                             'expected_out_regex' => ".*fred",
                             'rc' => 0
                         },{
                             'num' => 12,
-                            'pig' => "cp :INPATH:/singlefile/studenttab10k ./jim
-                                      mv ./jim ./bob
-                                      ls .",
+                            'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/jim
+                                      mv :OUTPATH:/jim :OUTPATH:/bob
+                                      ls :OUTPATH:",
                             'expected_out_regex' => ".*bob",
                             'rc' => 0
                         },{
@@ -103,18 +109,19 @@ $cfg = {
                         },{
                             'num' => 14,
                             'pig' => "copyToLocal :INPATH:/singlefile/votertab10k :TMP:
-                                      copyFromLocal :TMP:/votertab10k ./joe
-                                      cat ./joe",
+                                      copyFromLocal :TMP:/votertab10k :OUTPATH:/joe
+                                      cat :OUTPATH:/joe",
                             'expected_out_regex' => ":Grunt_14_output:",
                             'rc' => 0
                         },{
                             'num' => 15,
-                            'pig' => "rm fred bob joe",
-                            'not_expected_out_regex' => "joe",
+                            'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred
+                                      rm :OUTPATH:/fred",
+                            'not_expected_out_regex' => "fred",
                             'rc' => 0
                         },{
                             'num' => 16,
-                            'pig' => "rmf jill",
+                            'pig' => "rmf :OUTPATH:/jill",
                             'rc' => 0
                         }
                 ]

Modified: pig/branches/spark/test/e2e/pig/tests/hcat.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/hcat.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/hcat.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/hcat.conf Wed Feb 22 09:43:41 2017
@@ -44,7 +44,7 @@ stored as textfile;\,
 			'num' => 2,
 			'java_params' => ['-Dhcat.bin=:HCATBIN:'],
 			'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-SQL drop table if exists pig_hcat_ddl_1;
+SQL drop table if exists pig_hcat_ddl_1 purge;
 sql create table pig_hcat_ddl_1(name string,
 age int,
 gpa double)
@@ -55,6 +55,35 @@ store a into ':OUTPATH:';\,
 			},
 		]
 		},
+                {
+                'name' => 'Jython_HCatDDL',
+                'tests' => [
+                    {
+                        # sql command
+                                'num' => 1
+                                ,'java_params' => ['-Dhcat.bin=:HCATBIN:']
+                                ,'pig' => q\#!/usr/bin/python
+from org.apache.pig.scripting import Pig
+
+#create pig script
+
+Pig.sql("""sql drop table if exists pig_script_hcat_ddl_1;""")
+ret = Pig.sql("""sql create table pig_script_hcat_ddl_1(name string,
+age int,
+gpa double)
+stored as textfile;
+""")
+
+if ret==0:
+    print "SQL command PASSED"
+
+else:
+    raise "SQL command FAILED"
+\
+                       ,'rc' => 0
+                    },
+               ]
+               },
 	]
 }
 ;

Added: pig/branches/spark/test/e2e/pig/tests/join.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/join.conf?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/join.conf (added)
+++ pig/branches/spark/test/e2e/pig/tests/join.conf Wed Feb 22 09:43:41 2017
@@ -0,0 +1,310 @@
+#!/usr/bin/env perl
+############################################################################           
+#  Licensed to the Apache Software Foundation (ASF) under one or more                  
+#  contributor license agreements.  See the NOTICE file distributed with               
+#  this work for additional information regarding copyright ownership.                 
+#  The ASF licenses this file to You under the Apache License, Version 2.0             
+#  (the "License"); you may not use this file except in compliance with                
+#  the License.  You may obtain a copy of the License at                               
+#                                                                                      
+#      http://www.apache.org/licenses/LICENSE-2.0                                      
+#                                                                                      
+#  Unless required by applicable law or agreed to in writing, software                 
+#  distributed under the License is distributed on an "AS IS" BASIS,                   
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.            
+#  See the License for the specific language governing permissions and                 
+#  limitations under the License.                                                      
+                                                                                       
+###############################################################################
+
+$cfg = {
+    'driver' => 'Pig',
+
+    'groups' => [
+        {
+        'name' => 'BloomJoin_Map',
+        'execonly' => 'tez',
+        'tests' => [
+            {
+            # Tuple join key
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # bytearray join key
+            'num' => 2,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left outer join and chararray join key
+            'num' => 3,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Right outer join
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a union
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Right input from a union and integer join key
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a split
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            },
+            {
+            # Right input from a split
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            },
+        ] # end of tests
+        },
+        {
+        'name' => 'BloomJoin_Reduce',
+        'execonly' => 'tez',
+        'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],
+        'tests' => [
+            {
+            # Tuple join key
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # bytearray join key
+            'num' => 2,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left outer join and chararray join key
+            'num' => 3,
+            'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Right outer join
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a union
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Right input from a union and integer join key
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Left input from a split
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+            },
+            {
+            # Right input from a split
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            },
+        ] # end of tests
+        }
+    ] # end of groups
+};
\ No newline at end of file

Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Wed Feb 22 09:43:41 2017
@@ -728,6 +728,52 @@ b = union a1, a2;
 c = rank b by name ASC, age DESC DENSE;  
 store c into ':OUTPATH:';\,
             },
+            {
+            # Union + Split + Two replicate join
+            'num' => 12,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+a1 = filter a by gpa is null or gpa <= 3.9;
+a2 = filter a by gpa < 2;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c1 = filter c by age < 30;
+c2 = filter c by age > 50;
+d = join b by name, c1 by name using 'replicated';
+e = join d by b::name, c2 by name using 'replicated';
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Multiple Union + Multiple Split + Single store
+            'num' => 13,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa);
+u1 = union onschema a, b;
+SPLIT u1 INTO r IF age < 30, s OTHERWISE;
+c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions);
+d = JOIN r BY name LEFT, c BY votername;
+u2 = UNION ONSCHEMA d, s;
+e = FILTER u2 BY name == 'nick miller';
+f = FILTER u2 BY age > 70 ;
+u3 = UNION ONSCHEMA e, f;
+store u3 into ':OUTPATH:';\,
+            },
+            {
+            # PIG-5082. Similar to MultiQuery_Union_13 but for non-store vertex group
+            'num' => 14,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa);
+u1 = union onschema a, b;
+SPLIT u1 INTO r IF age < 30, s OTHERWISE;
+c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions);
+d = JOIN r BY name LEFT, c BY votername;
+u2 = UNION ONSCHEMA d, s;
+e = FILTER u2 BY name == 'nick miller';
+f = FILTER u2 BY age > 70 ;
+u3 = UNION ONSCHEMA e, f;
+SPLIT u3 INTO t if age > 75, u OTHERWISE;
+v = JOIN t BY name LEFT, c BY votername;
+store v into ':OUTPATH:';\,
+            }
             ] # end of tests
         },
         
@@ -860,7 +906,38 @@ m = UNION e, i, j, n;
 
 n = JOIN a BY name, m BY name;
 store n into ':OUTPATH:';\,
-            }
+            },
+            {
+            # Self join bloom left outer
+            'num' => 12,
+            'execonly' => 'tez',
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name using 'bloom';
+store d into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join bloom left outer with strategy as reduce
+            'num' => 13,
+            'execonly' => 'tez',
+            'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name using 'bloom';
+store d into ':OUTPATH:';\,
+            'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name;
+store d into ':OUTPATH:';\,
+            },
             ] # end of tests
         },
 

Modified: pig/branches/spark/test/e2e/pig/tests/negative.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/negative.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/negative.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/negative.conf Wed Feb 22 09:43:41 2017
@@ -473,7 +473,7 @@ define CMD `perl PigStreaming.pl` input(
 A = load ':INPATH:/singlefile/studenttab10k';
 B = stream A through CMD;
 store B into ':OUTPATH:';\,
-                        'expected_err_regex' => "Error reading output from Streaming binary",
+                        'expected_err_regex' => "Error reading output from Streaming binary|Error while reading from POStream and passing it to the streaming process",
                         },
 			{
 			# Invalid serializer - throws exception
@@ -568,24 +568,7 @@ store D into ':OUTPATH:';\,
                         'expected_err_regex' => "Could not resolve StringStoreBad using imports",
 			},
 		]
-		},
-		{
-		'name' => 'LineageErrors',
-		'tests' => [
-			{
-			# UDF returns a bytearray that is cast to an integer
-                'num' => 1,
-                'pig' => q\register :FUNCPATH:/testudf.jar;
-a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by name lt 'b';
-c = foreach b generate org.apache.pig.test.udf.evalfunc.CreateMap((chararray)name, age);
-d = foreach c generate $0#'alice young';
-split d into e if $0 < 42, f if $0 >= 42;
-store e into ':OUTPATH:';\,
-                'expected_err_regex' => "Received a bytearray from the UDF or Union from two different Loaders. Cannot determine how to convert the bytearray to int",
-            },
-        ]
-        }
+		}
     ]
 }
 ;