You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/02/22 09:43:46 UTC
svn commit: r1783988 [14/24] - in /pig/branches/spark: ./ bin/ conf/
contrib/piggybank/java/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Wed Feb 22 09:43:41 2017
@@ -27,6 +27,8 @@ import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
+import java.util.ArrayDeque;
+import java.util.Deque;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -40,20 +42,26 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.validator.BlackAndWhitelistFilter;
import org.apache.pig.validator.PigCommandFilter;
-import org.python.google.common.base.Preconditions;
public class PreprocessorContext {
- private Map<String, String> param_val;
+ private int tableinitsize = 10;
+ private Deque<Map<String,String>> param_val_stack;
- // used internally to detect when a param is set multiple times,
- // but it set with the same value so it's ok not to log a warning
- private Map<String, String> param_source;
-
private PigContext pigContext;
public Map<String, String> getParamVal() {
- return param_val;
+ Map <String, String> ret = new Hashtable <String, String>(tableinitsize);
+
+ //stack (deque) iterates LIFO
+ for (Map <String, String> map : param_val_stack ) {
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ if( ! ret.containsKey(entry.getKey()) ) {
+ ret.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+ return ret;
}
private final Log log = LogFactory.getLog(getClass());
@@ -63,24 +71,15 @@ public class PreprocessorContext {
* smaller number only impacts performance
*/
public PreprocessorContext(int limit) {
- param_val = new Hashtable<String, String> (limit);
- param_source = new Hashtable<String, String> (limit);
- }
-
- public PreprocessorContext(Map<String, String> paramVal) {
- param_val = paramVal;
- param_source = new Hashtable<String, String>(paramVal);
+ tableinitsize = limit;
+ param_val_stack = new ArrayDeque<Map<String,String>> ();
+ param_val_stack.push(new Hashtable<String, String> (tableinitsize));
}
public void setPigContext(PigContext context) {
this.pigContext = context;
}
- /*
- public void processLiteral(String key, String val) {
- processLiteral(key, val, true);
- } */
-
/**
* This method generates parameter value by running specified command
*
@@ -102,20 +101,35 @@ public class PreprocessorContext {
processOrdLine(key, val, true);
}
- /*
- public void processLiteral(String key, String val, Boolean overwrite) {
+ public void paramScopePush() {
+ param_val_stack.push( new Hashtable<String, String> (tableinitsize) );
+ }
- if (param_val.containsKey(key)) {
- if (overwrite) {
- log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
- } else {
- return;
+ public void paramScopePop() {
+ param_val_stack.pop();
+ }
+
+ public boolean paramval_containsKey(String key) {
+ for (Map <String, String> map : param_val_stack ) {
+ if( map.containsKey(key) ) {
+ return true;
}
}
+ return false;
+ }
- String sub_val = substitute(val);
- param_val.put(key, sub_val);
- } */
+ public String paramval_get(String key) {
+ for (Map <String, String> map : param_val_stack ) {
+ if( map.containsKey(key) ) {
+ return map.get(key);
+ }
+ }
+ return null;
+ }
+
+ public void paramval_put(String key, String value) {
+ param_val_stack.peek().put(key, value);
+ }
/**
* This method generates parameter value by running specified command
@@ -129,21 +143,21 @@ public class PreprocessorContext {
filter.validate(PigCommandFilter.Command.SH);
}
- if (param_val.containsKey(key)) {
- if (param_source.get(key).equals(val) || !overwrite) {
- return;
- } else {
- log.warn("Warning : Multiple values found for " + key
- + ". Using value " + val);
- }
+ if (paramval_containsKey(key) && !overwrite) {
+ return;
}
- param_source.put(key, val);
-
val = val.substring(1, val.length()-1); //to remove the backticks
String sub_val = substitute(val);
sub_val = executeShellCommand(sub_val);
- param_val.put(key, sub_val);
+
+ if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) {
+ //(boolean overwrite is always true here)
+ log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. "
+ + "Previous value " + paramval_get(key) + ", now using value " + sub_val);
+ }
+
+ paramval_put(key, sub_val);
}
public void validate(String preprocessorCmd) throws FrontendException {
@@ -175,18 +189,18 @@ public class PreprocessorContext {
*/
public void processOrdLine(String key, String val, Boolean overwrite) throws ParameterSubstitutionException {
- if (param_val.containsKey(key)) {
- if (param_source.get(key).equals(val) || !overwrite) {
+ String sub_val = substitute(val, key);
+ if (paramval_containsKey(key)) {
+ if (paramval_get(key).equals(sub_val) || !overwrite) {
return;
} else {
- log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
+ log.warn("Warning : Multiple values found for " + key
+ + ". Previous value " + paramval_get(key)
+ + ", now using value " + sub_val);
}
}
- param_source.put(key, val);
-
- String sub_val = substitute(val, key);
- param_val.put(key, sub_val);
+ paramval_put(key, sub_val);
}
@@ -318,7 +332,7 @@ public class PreprocessorContext {
while (bracketKeyMatcher.find()) {
if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) {
key = bracketKeyMatcher.group(1);
- if (!(param_val.containsKey(key))) {
+ if (!(paramval_containsKey(key))) {
String message;
if (parentKey == null) {
message = "Undefined parameter : " + key;
@@ -327,7 +341,7 @@ public class PreprocessorContext {
}
throw new ParameterSubstitutionException(message);
}
- val = param_val.get(key);
+ val = paramval_get(key);
if (val.contains("$")) {
val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
}
@@ -345,7 +359,7 @@ public class PreprocessorContext {
// for escaped vars of the form \$<id>
if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) {
key = keyMatcher.group(1);
- if (!(param_val.containsKey(key))) {
+ if (!(paramval_containsKey(key))) {
String message;
if (parentKey == null) {
message = "Undefined parameter : " + key;
@@ -354,7 +368,7 @@ public class PreprocessorContext {
}
throw new ParameterSubstitutionException(message);
}
- val = param_val.get(key);
+ val = paramval_get(key);
if (val.contains("$")) {
val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Wed Feb 22 09:43:41 2017
@@ -23,6 +23,7 @@ options {
STATIC = false;
// Case is ignored in keywords
IGNORE_CASE = true;
+ // DEBUG_PARSER = true;
JAVA_UNICODE_ESCAPE = true;
}
@@ -36,7 +37,7 @@ import java.util.List;
import java.util.ArrayList;
import org.apache.pig.impl.util.StringUtils;
-import jline.ConsoleReader;
+import jline.console.ConsoleReader;
public abstract class PigScriptParser
{
@@ -217,7 +218,7 @@ TOKEN_MGR_DECLS : {
{
/*System.err.print(">> ");
System.err.flush();*/
- consoleReader.setDefaultPrompt(">> ");
+ consoleReader.setPrompt(">> ");
}
}
@@ -267,7 +268,7 @@ TOKEN_MGR_DECLS : {
<"'"> {prevState = PIG_START;} : IN_STRING
| <"`"> {prevState = PIG_START;} : IN_COMMAND
| <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = PIG_START;} : SCHEMA_DEFINITION
-| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+ > {prevState = PIG_START;} : GENERATE
+| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+ > {prevState = PIG_START;} : GENERATE
| <"{"> {pigBlockLevel = 1;} : IN_BLOCK
| <"}"> {if (true) throw new TokenMgrError("Unmatched '}'", TokenMgrError.LEXICAL_ERROR);}
| <";"> : PIG_END
@@ -292,7 +293,8 @@ TOKEN_MGR_DECLS : {
<IN_STRING> MORE :
{
- <"\\'">
+ <"\\\\">
+| <"\\'">
| <"'"> { SwitchTo(prevState);}
| <("\n" | "\r" | "\r\n")> {secondary_prompt();}
| <(~[])>
@@ -395,7 +397,7 @@ TOKEN_MGR_DECLS : {
{
<"\""> {prevState = IN_BLOCK;} : IN_DOUBLE_QUOTED_STRING
| <(" " | "\t")+["A","a"]["S","s"](" " | "\t")+ > {prevState = IN_BLOCK;} : SCHEMA_DEFINITION
-| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t")+> {prevState = IN_BLOCK;} : GENERATE
+| <(" " | "\t")+["G","g"]["E","e"]["N","n"]["E","e"]["R","r"]["A","a"]["T","t"]["E","e"](" " | "\t" | "\r" | "\n")+> {prevState = IN_BLOCK;} : GENERATE
| <"{"> {pigBlockLevel++;}
| <"}"(";")?> {pigBlockLevel--; if (pigBlockLevel == 0) SwitchTo(PIG_END);}
| <"'"> {prevState = IN_BLOCK;} : IN_STRING
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmbeddedPigStats.java Wed Feb 22 09:43:41 2017
@@ -147,6 +147,11 @@ final class EmbeddedPigStats extends Pig
}
@Override
+ public String getDisplayString() {
+ return null;
+ }
+
+ @Override
public long getProactiveSpillCountObjects() {
throw new UnsupportedOperationException();
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/EmptyPigStats.java Wed Feb 22 09:43:41 2017
@@ -87,6 +87,11 @@ public class EmptyPigStats extends PigSt
}
@Override
+ public String getDisplayString() {
+ return null;
+ }
+
+ @Override
public JobGraph getJobGraph() {
return emptyJobPlan;
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStats.java Wed Feb 22 09:43:41 2017
@@ -134,6 +134,11 @@ public abstract class PigStats {
}
/**
+ * Returns the display message in pig grunt
+ */
+ public abstract String getDisplayString();
+
+ /**
* Returns the DAG of jobs spawned by the script
*/
public JobGraph getJobGraph() {
@@ -265,6 +270,13 @@ public abstract class PigStats {
return ScriptState.get().getPigVersion();
}
+ /**
+ * Returns the contents of the script that was run.
+ */
+ public String getScript() {
+ return ScriptState.get().getScript();
+ }
+
public String getScriptId() {
return ScriptState.get().getId();
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Wed Feb 22 09:43:41 2017
@@ -24,7 +24,7 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.SimplePigStats;
/**
@@ -71,7 +71,7 @@ public class PigStatsUtil {
*/
@Deprecated
public static final String FS_COUNTER_GROUP
- = HadoopShims.getFsCounterGroupName();
+ = MRPigStatsUtil.FS_COUNTER_GROUP;
/**
* Returns an empty PigStats object Use of this method is not advised as it
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Wed Feb 22 09:43:41 2017
@@ -133,6 +133,8 @@ public abstract class ScriptState {
MERGE_SPARSE_JOIN,
REPLICATED_JOIN,
SKEWED_JOIN,
+ BUILD_BLOOM,
+ FILTER_BLOOM,
HASH_JOIN,
COLLECTED_GROUP,
MERGE_COGROUP,
@@ -312,7 +314,7 @@ public abstract class ScriptState {
maxScriptSize = Integer.valueOf(prop);
}
}
-
+
this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize)
: script;
@@ -485,6 +487,10 @@ public abstract class ScriptState {
public void visit(LOJoin op) {
if (op.getJoinType() == JOINTYPE.HASH) {
feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+ } else if (op.getJoinType() == JOINTYPE.BLOOM) {
+ feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+ feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+ feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
} else if (op.getJoinType() == JOINTYPE.MERGE) {
feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
} else if (op.getJoinType() == JOINTYPE.MERGESPARSE) {
@@ -506,6 +512,7 @@ public abstract class ScriptState {
feature.set(PIG_FEATURE.RANK.ordinal());
}
+ @Override
public void visit(LOSort op) {
feature.set(PIG_FEATURE.ORDER_BY.ordinal());
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Wed Feb 22 09:43:41 2017
@@ -32,15 +32,16 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.TaskReport;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.PigCounters;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.io.FileSpec;
@@ -53,6 +54,8 @@ import org.apache.pig.tools.pigstats.Out
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.python.google.common.collect.Lists;
+
/**
* This class encapsulates the runtime statistics of a MapReduce job.
@@ -281,7 +284,7 @@ public final class MRJobStats extends Jo
void addCounters(Job job) {
try {
- counters = HadoopShims.getCounters(job);
+ counters = getCounters(job);
} catch (IOException e) {
LOG.warn("Unable to get job counters", e);
}
@@ -349,13 +352,13 @@ public final class MRJobStats extends Jo
void addMapReduceStatistics(Job job) {
Iterator<TaskReport> maps = null;
try {
- maps = HadoopShims.getTaskReports(job, TaskType.MAP);
+ maps = getTaskReports(job, TaskType.MAP);
} catch (IOException e) {
LOG.warn("Failed to get map task report", e);
}
Iterator<TaskReport> reduces = null;
try {
- reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ reduces = getTaskReports(job, TaskType.REDUCE);
} catch (IOException e) {
LOG.warn("Failed to get reduce task report", e);
}
@@ -515,4 +518,35 @@ public final class MRJobStats extends Jo
inputs.add(is);
}
+ public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
+ if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
+ LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
+ return null;
+ }
+ Cluster cluster = new Cluster(job.getJobConf());
+ try {
+ org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+ if (mrJob == null) { // In local mode, mrJob will be null
+ mrJob = job.getJob();
+ }
+ org.apache.hadoop.mapreduce.TaskReport[] reports = mrJob.getTaskReports(type);
+ return Lists.newArrayList(reports).iterator();
+ } catch (InterruptedException ir) {
+ throw new IOException(ir);
+ }
+ }
+
+ public static Counters getCounters(Job job) throws IOException {
+ try {
+ Cluster cluster = new Cluster(job.getJobConf());
+ org.apache.hadoop.mapreduce.Job mrJob = cluster.getJob(job.getAssignedJobID());
+ if (mrJob == null) { // In local mode, mrJob will be null
+ mrJob = job.getJob();
+ }
+ return new Counters(mrJob.getCounters());
+ } catch (Exception ir) {
+ throw new IOException(ir);
+ }
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRPigStatsUtil.java Wed Feb 22 09:43:41 2017
@@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.NativeMapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.classification.InterfaceAudience.Private;
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.JobStats;
@@ -51,7 +50,7 @@ public class MRPigStatsUtil extends PigS
public static final String TASK_COUNTER_GROUP
= "org.apache.hadoop.mapred.Task$Counter";
public static final String FS_COUNTER_GROUP
- = HadoopShims.getFsCounterGroupName();
+ = "org.apache.hadoop.mapreduce.FileSystemCounter";
private static final Log LOG = LogFactory.getLog(MRPigStatsUtil.class);
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/SimplePigStats.java Wed Feb 22 09:43:41 2017
@@ -207,13 +207,18 @@ public final class SimplePigStats extend
}
void display() {
+ LOG.info(getDisplayString());
+ }
+
+ @Override
+ public String getDisplayString() {
if (returnCode == ReturnCode.UNKNOWN) {
LOG.warn("unknown return code, can't display the results");
- return;
+ return "";
}
if (pigContext == null) {
LOG.warn("unknown exec type, don't display the results");
- return;
+ return "";
}
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
@@ -276,7 +281,7 @@ public final class SimplePigStats extend
sb.append("\nJob DAG:\n").append(jobPlan.toString());
- LOG.info("Script Statistics: \n" + sb.toString());
+ return "Script Statistics: \n" + sb.toString();
}
void mapMROperToJob(MapReduceOper mro, Job job) {
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Feb 22 09:43:41 2017
@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -40,7 +39,6 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.spark.api.java.JavaSparkContext;
@@ -115,29 +113,35 @@ public class SparkPigStats extends PigSt
}
private void display() {
+ LOG.info(getDisplayString());
+ }
+
+ public String getDisplayString() {
+ StringBuilder sb = new StringBuilder();
Iterator<JobStats> iter = jobPlan.iterator();
while (iter.hasNext()) {
- SparkJobStats js = (SparkJobStats)iter.next();
+ SparkJobStats js = (SparkJobStats) iter.next();
if (jobSparkOperatorMap.containsKey(js)) {
SparkOperator sparkOperator = jobSparkOperatorMap.get(js);
js.setAlias(sparkOperator);
}
- LOG.info( "Spark Job [" + js.getJobId() + "] Metrics");
+ sb.append("Spark Job [" + js.getJobId() + "] Metrics");
Map<String, Long> stats = js.getStats();
if (stats == null) {
- LOG.info("No statistics found for job " + js.getJobId());
- return;
+ sb.append("No statistics found for job " + js.getJobId());
+ return sb.toString();
}
Iterator statIt = stats.entrySet().iterator();
while (statIt.hasNext()) {
- Map.Entry pairs = (Map.Entry)statIt.next();
- LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue());
+ Map.Entry pairs = (Map.Entry) statIt.next();
+ sb.append("\t" + pairs.getKey() + " : " + pairs.getValue());
}
- for (InputStats inputStat : js.getInputs()){
- LOG.info("\t"+inputStat.getDisplayString());
+ for (InputStats inputStat : js.getInputs()) {
+ sb.append("\t" + inputStat.getDisplayString());
}
}
+ return sb.toString();
}
@Override
@@ -217,4 +221,4 @@ public class SparkPigStats extends PigSt
sparkOperatorsSet.add(sparkOperator);
}
-}
+}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Feb 22 09:43:41 2017
@@ -18,12 +18,8 @@
package org.apache.pig.tools.pigstats.spark;
-import java.util.List;
-
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
@@ -101,21 +97,7 @@ public class SparkStatsUtil {
public static long getLoadSparkCounterValue(POLoad load) {
SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
- int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
- return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load))/loadersCount;
- }
-
- private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan pp){
- List<PhysicalOperator> successors = pp.getSuccessors(op);
- if (successors == null || successors.size()==0) return 1;
- for (PhysicalOperator successor : successors){
- if (successor instanceof POSplit){
- return ((POSplit)successor).getPlans().size();
- }else{
- return countCoLoadsIfInSplit(successor,pp);
- }
- }
- return 1;
+ return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load));
}
public static boolean isJobSuccess(int jobID,
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Wed Feb 22 09:43:41 2017
@@ -245,7 +245,11 @@ public class TezDAGStats extends JobStat
OutputStats existingOut = outputsByLocation.get(output.getLocation());
// In case of multistore, bytesWritten is already calculated
// from size of all the files in the output directory.
- if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) {
+ // So use that if there is a combination of multistore and single store
+ if (output.getPOStore().isMultiStore()) {
+ existingOut.setBytes(output.getBytes());
+ existingOut.setPOStore(output.getPOStore());
+ } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) {
long bytes = existingOut.getBytes() > -1
? (existingOut.getBytes() + output.getBytes())
: output.getBytes();
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Wed Feb 22 09:43:41 2017
@@ -117,6 +117,11 @@ public class TezPigScriptStats extends P
}
private void display() {
+ LOG.info(getDisplayString());
+ }
+
+ @Override
+ public String getDisplayString() {
SimpleDateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
StringBuilder sb = new StringBuilder();
sb.append("\n");
@@ -170,7 +175,7 @@ public class TezPigScriptStats extends P
for (OutputStats os : getOutputStats()) {
sb.append(os.getDisplayString().trim()).append("\n");
}
- LOG.info("Script Statistics:\n" + sb.toString());
+ return "Script Statistics:\n" + sb.toString();
}
/**
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Wed Feb 22 09:43:41 2017
@@ -275,6 +275,12 @@ public class TezScriptState extends Scri
if (tezOp.isRegularJoin()) {
feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
}
+ if (tezOp.isBuildBloom()) {
+ feature.set(PIG_FEATURE.BUILD_BLOOM.ordinal());
+ }
+ if (tezOp.isFilterBloom()) {
+ feature.set(PIG_FEATURE.FILTER_BLOOM.ordinal());
+ }
if (tezOp.isUnion()) {
feature.set(PIG_FEATURE.UNION.ordinal());
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Feb 22 09:43:41 2017
@@ -22,6 +22,7 @@ import static org.apache.pig.tools.pigst
import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -289,13 +290,19 @@ public class TezVertexStats extends JobS
}
// Split followed by union will have multiple stores writing to same location
- Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
+ Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>();
for (POStore sto : stores) {
POStoreTez store = (POStoreTez) sto;
- uniqueOutputs.put(store.getOutputKey(), store);
+ List<POStore> stores = uniqueOutputs.get(store.getOutputKey());
+ if (stores == null) {
+ stores = new ArrayList<POStore>();
+ }
+ stores.add(store);
+ uniqueOutputs.put(store.getOutputKey(), stores);
}
- for (POStore sto : uniqueOutputs.values()) {
+ for (List<POStore> stores : uniqueOutputs.values()) {
+ POStore sto = stores.get(0);
if (sto.isTmpStore()) {
continue;
}
@@ -304,11 +311,16 @@ public class TezVertexStats extends JobS
String filename = sto.getSFile().getFileName();
if (counters != null) {
if (msGroup != null) {
- Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
- if (n != null) records = n;
- }
- if (records == -1) {
- records = outputRecords;
+ long n = 0;
+ Long val = null;
+ for (POStore store : stores) {
+ val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store));
+ // Tez removes 0 value counters for efficiency.
+ if (val != null) {
+ n += val;
+ };
+ }
+ records = n;
}
if (isSuccessful() && records == -1) {
// Tez removes 0 value counters for efficiency.
@@ -338,13 +350,13 @@ public class TezVertexStats extends JobS
@Override
@Deprecated
public int getNumberMaps() {
- return this.isMapOpts ? numTasks : -1;
+ return this.isMapOpts ? numTasks : 0;
}
@Override
@Deprecated
public int getNumberReduces() {
- return this.isMapOpts ? -1 : numTasks;
+ return this.isMapOpts ? 0 : numTasks;
}
@Override
@@ -386,25 +398,25 @@ public class TezVertexStats extends JobS
@Override
@Deprecated
public long getMapInputRecords() {
- return this.isMapOpts ? numInputRecords : -1;
+ return this.isMapOpts ? numInputRecords : 0;
}
@Override
@Deprecated
public long getMapOutputRecords() {
- return this.isMapOpts ? numOutputRecords : -1;
+ return this.isMapOpts ? numOutputRecords : 0;
}
@Override
@Deprecated
public long getReduceInputRecords() {
- return this.isMapOpts ? -1 : numInputRecords;
+ return numReduceInputRecords;
}
@Override
@Deprecated
public long getReduceOutputRecords() {
- return this.isMapOpts ? -1 : numOutputRecords;
+ return this.isMapOpts ? 0 : numOutputRecords;
}
@Override
Modified: pig/branches/spark/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/pig-default.properties?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/pig-default.properties (original)
+++ pig/branches/spark/src/pig-default.properties Wed Feb 22 09:43:41 2017
@@ -61,4 +61,8 @@ pig.stats.output.size.reader.unsupported
pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
-pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
+
+pig.ats.enabled=true
+
+pig.tez.configure.am.memory=true
Added: pig/branches/spark/start-build-env.sh
URL: http://svn.apache.org/viewvc/pig/branches/spark/start-build-env.sh?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/start-build-env.sh (added)
+++ pig/branches/spark/start-build-env.sh Wed Feb 22 09:43:41 2017
@@ -0,0 +1,63 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+set -e # exit on error
+
+cd "$(dirname "$0")" # connect to root
+
+docker build -t pig-build dev-support/docker
+
+if [ "$(uname -s)" == "Linux" ]; then
+ USER_NAME=${SUDO_USER:=${USER}}
+ USER_ID=$(id -u "${USER_NAME}")
+ GROUP_ID=$(id -g "${USER_NAME}")
+else # boot2docker uid and gid
+ USER_NAME=${USER}
+ USER_ID=1000
+ GROUP_ID=50
+fi
+
+docker build -t "pig-build-${USER_NAME}" - <<UserSpecificDocker
+FROM pig-build
+RUN bash configure-for-user.sh ${USER_NAME} ${USER_ID} ${GROUP_ID} "$(fgrep vboxsf /etc/group)"
+UserSpecificDocker
+
+# By mapping the .m2 directory you can do an mvn install from
+# within the container and use the result on your normal
+# system. This also is a significant speedup in subsequent
+# builds because the dependencies are downloaded only once.
+# Same with the .ivy2 directory
+
+DOCKER="docker run --rm=true -t -i"
+DOCKER=${DOCKER}" -u ${USER_NAME}"
+
+# Work in the current directory
+DOCKER=${DOCKER}" -v ${PWD}:/home/${USER_NAME}/pig"
+DOCKER=${DOCKER}" -w /home/${USER_NAME}/pig"
+
+# Mount persistent caching of 'large' downloads
+DOCKER=${DOCKER}" -v ${HOME}/.m2:/home/${USER_NAME}/.m2"
+DOCKER=${DOCKER}" -v ${HOME}/.ivy2:/home/${USER_NAME}/.ivy2"
+
+# What do we run?
+DOCKER=${DOCKER}" --name pig-build-${USER_NAME}-$$"
+DOCKER=${DOCKER}" pig-build-${USER_NAME}"
+DOCKER=${DOCKER}" bash"
+
+# Now actually start it
+${DOCKER}
+
Modified: pig/branches/spark/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/build.xml (original)
+++ pig/branches/spark/test/e2e/pig/build.xml Wed Feb 22 09:43:41 2017
@@ -27,9 +27,8 @@
<property name="hive.lib.dir"
value="${pig.base.dir}/build/ivy/lib/Pig"/>
- <condition property="hive.hadoop.shims.version" value="0.23" else="0.20S">
- <equals arg1="${hadoopversion}" arg2="23" />
- </condition>
+ <property name="hadoopversion" value="2" />
+ <property name="hive.hadoop.shims.version" value="0.23" />
<property name="mvnrepo" value="http://repo2.maven.org/maven2"/>
@@ -61,6 +60,7 @@
<property name="harness.PH_LOCAL" value="."/>
<property name="harness.PH_OUT" value="."/>
<property name="harness.PERL5LIB" value="./libexec"/>
+ <property name="harness.user.home" value="/user/pig" />
<property name="test.location" value="${basedir}/testdist"/>
<property name="benchmark.location" value="${test.location}/benchmarks"/>
@@ -137,6 +137,7 @@
<path path="${test.location}/tests/multiquery.conf"/>
<path path="${test.location}/tests/negative.conf"/>
<path path="${test.location}/tests/nightly.conf"/>
+ <path path="${test.location}/tests/join.conf"/>
<path path="${test.location}/tests/streaming.conf"/>
<path path="${test.location}/tests/streaming_local.conf"/>
<path path="${test.location}/tests/turing_jython.conf"/>
@@ -309,6 +310,7 @@
<env key="PH_HIVE_LIB_DIR" value="${hive.lib.dir}"/>
<env key="PH_HIVE_VERSION" value="${hive.version}"/>
<env key="PH_HIVE_SHIMS_VERSION" value="${hive.hadoop.shims.version}"/>
+ <env key="PH_HDFS_BASE" value="${harness.user.home}" />
<env key="HARNESS_CONF" value="${harness.conf.file}"/>
<env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
<env key="HADOOP_PREFIX" value="${HADOOP_PREFIX}"/>
@@ -369,6 +371,7 @@
<env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/>
<env key="HARNESS_CONF" value="${harness.conf.file}"/>
<env key="HADOOP_HOME" value="${harness.hadoop.home}"/>
+ <env key="PH_HDFS_BASE" value="${harness.user.home}" />
<arg value="./test_harness.pl"/>
<arg value="-deploycfg"/>
Modified: pig/branches/spark/test/e2e/pig/conf/spark.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/spark.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/spark.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/spark.conf Wed Feb 22 09:43:41 2017
@@ -30,8 +30,8 @@ my $hdfsBase = $ENV{PH_HDFS_BASE} || "/u
$cfg = {
#HDFS
- 'inpathbase' => "$ENV{PH_ROOT}/data"
- , 'outpathbase' => "$ENV{PH_ROOT}/testout"
+ 'inpathbase' => "$hdfsBase/test/data"
+ , 'outpathbase' => "$hdfsBase/out"
#LOCAL
, 'localinpathbase' => "$ENV{PH_LOCAL}/in"
@@ -55,7 +55,7 @@ $cfg = {
, 'hcatbin' => "$ENV{HCAT_BIN}"
, 'usePython' => "$ENV{PIG_USE_PYTHON}"
, 'exectype' => 'spark'
- , 'benchmark_exectype' => 'local'
+ , 'benchmark_exectype' => 'mapred'
#HADOOP
, 'mapredjars' => "$ENV{PH_ROOT}/lib"
Modified: pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm (original)
+++ pig/branches/spark/test/e2e/pig/deployers/ExistingClusterDeployer.pm Wed Feb 22 09:43:41 2017
@@ -231,11 +231,6 @@ sub generateData
'rows' => 5000,
'hdfs' => "types/numbers.txt",
}, {
- 'name' => "biggish",
- 'filetype' => "biggish",
- 'rows' => 1000000,
- 'hdfs' => "singlefile/biggish",
- }, {
'name' => "prerank",
'filetype' => "ranking",
'rows' => 30,
Modified: pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm (original)
+++ pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm Wed Feb 22 09:43:41 2017
@@ -209,11 +209,21 @@ sub generateData
'filetype' => "ranking",
'rows' => 30,
'outfile' => "singlefile/prerank",
+ }, {
+ 'name' => "utf8Voter",
+ 'filetype' => "utf8Voter",
+ 'rows' => 30,
+ 'outfile' => "utf8Data/选民/utf8Voter",
+ }, {
+ 'name' => "utf8Student",
+ 'filetype' => "utf8Student",
+ 'rows' => 300,
+ 'outfile' => "utf8Data/学生/utf8Student",
}
);
# Create the target directories
- for my $dir ("singlefile", "dir", "types", "glob/star/somegood",
+ for my $dir ("singlefile", "utf8Data/选民", "utf8Data/学生", "dir", "types", "glob/star/somegood",
"glob/star/moregood", "glob/star/bad") {
my @cmd = ("mkdir", "-p", "$cfg->{'inpathbase'}/$dir");
$self->runCmd($log, \@cmd);
Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Wed Feb 22 09:43:41 2017
@@ -211,13 +211,6 @@ sub runTest
$testCmd->{'pig'} = $testCmd->{'pig_win'};
}
- if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
- $oldpig = $testCmd->{'pig'};
- $testCmd->{'pig'} = $testCmd->{'pig23'};
- }
- if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'expected_err_regex23'}) {
- $testCmd->{'expected_err_regex'} = $testCmd->{'expected_err_regex23'};
- }
my $res = $self->runPigCmdLine( $testCmd, $log, 1, $resources );
if ($oldpig) {
$testCmd->{'pig'} = $oldpig;
@@ -231,10 +224,6 @@ sub runTest
$testCmd->{'pig'} = $testCmd->{'pig_win'};
}
- if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
- $oldpig = $testCmd->{'pig'};
- $testCmd->{'pig'} = $testCmd->{'pig23'};
- }
my $res = $self->runPig( $testCmd, $log, 1, $resources );
if ($oldpig) {
$testCmd->{'pig'} = $oldpig;
@@ -686,9 +675,6 @@ sub generateBenchmark
if ((Util::isWindows()||Util::isCygwin()) && $testCmd->{'pig_win'}) {
$modifiedTestCmd{'pig'} = $testCmd->{'pig_win'};
}
- if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
- $modifiedTestCmd{'pig'} = $testCmd->{'pig23'};
- }
# Change so we're looking at the old version of Pig
if (defined $testCmd->{'oldpigpath'} && $testCmd->{'oldpigpath'} ne "") {
$modifiedTestCmd{'pigpath'} = $testCmd->{'oldpigpath'};
@@ -1058,10 +1044,6 @@ sub wrongExecutionMode($$)
}
}
- if (defined $testCmd->{'ignore23'} && $testCmd->{'hadoopversion'}=='23') {
- $wrong = 1;
- }
-
if ($wrong) {
print $log "Skipping test $testCmd->{'group'}" . "_" .
$testCmd->{'num'} . " since it is not suppsed to be run in hadoop 23\n";
Modified: pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl (original)
+++ pig/branches/spark/test/e2e/pig/streaming/PigStreaming.pl Wed Feb 22 09:43:41 2017
@@ -73,7 +73,7 @@ while (<$input_handle>)
{
chomp;
$data = $_;
- if (defined(%hash) && (exists $hash{$data}))
+ if (exists $hash{$data})
{
print $output_handle "$hash{$data}\n";
}
Modified: pig/branches/spark/test/e2e/pig/tests/grunt.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/grunt.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/grunt.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/grunt.conf Wed Feb 22 09:43:41 2017
@@ -46,7 +46,12 @@ $cfg = {
'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode
'expected_out_regex' => "/user",
'rc' => 0
-
+ },{
+ 'num' => 3,
+ 'pig' => "ls .",
+ 'execonly' => 'mapred,tez',
+ 'expected_out_regex' => "/user",
+ 'rc' => 0
},{
'num' => 4,
'pig' => "ls :INPATH:",
@@ -77,21 +82,22 @@ $cfg = {
'rc' => 0
},{
'num' => 10,
- 'pig' => "cp :INPATH:/singlefile/studenttab10k .
- ls .",
+ 'pig' => "mkdir :OUTPATH:
+ cp :INPATH:/singlefile/studenttab10k :OUTPATH:
+ ls :OUTPATH:",
'expected_out_regex' => ".*studenttab10k",
'rc' => 0
},{
'num' => 11,
- 'pig' => "cp :INPATH:/singlefile/studenttab10k ./fred
- ls .",
+ 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred
+ ls :OUTPATH:",
'expected_out_regex' => ".*fred",
'rc' => 0
},{
'num' => 12,
- 'pig' => "cp :INPATH:/singlefile/studenttab10k ./jim
- mv ./jim ./bob
- ls .",
+ 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/jim
+ mv :OUTPATH:/jim :OUTPATH:/bob
+ ls :OUTPATH:",
'expected_out_regex' => ".*bob",
'rc' => 0
},{
@@ -103,18 +109,19 @@ $cfg = {
},{
'num' => 14,
'pig' => "copyToLocal :INPATH:/singlefile/votertab10k :TMP:
- copyFromLocal :TMP:/votertab10k ./joe
- cat ./joe",
+ copyFromLocal :TMP:/votertab10k :OUTPATH:/joe
+ cat :OUTPATH:/joe",
'expected_out_regex' => ":Grunt_14_output:",
'rc' => 0
},{
'num' => 15,
- 'pig' => "rm fred bob joe",
- 'not_expected_out_regex' => "joe",
+ 'pig' => "cp :INPATH:/singlefile/studenttab10k :OUTPATH:/fred
+ rm :OUTPATH:/fred",
+ 'not_expected_out_regex' => "fred",
'rc' => 0
},{
'num' => 16,
- 'pig' => "rmf jill",
+ 'pig' => "rmf :OUTPATH:/jill",
'rc' => 0
}
]
Modified: pig/branches/spark/test/e2e/pig/tests/hcat.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/hcat.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/hcat.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/hcat.conf Wed Feb 22 09:43:41 2017
@@ -44,7 +44,7 @@ stored as textfile;\,
'num' => 2,
'java_params' => ['-Dhcat.bin=:HCATBIN:'],
'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-SQL drop table if exists pig_hcat_ddl_1;
+SQL drop table if exists pig_hcat_ddl_1 purge;
sql create table pig_hcat_ddl_1(name string,
age int,
gpa double)
@@ -55,6 +55,35 @@ store a into ':OUTPATH:';\,
},
]
},
+ {
+ 'name' => 'Jython_HCatDDL',
+ 'tests' => [
+ {
+ # sql command
+ 'num' => 1
+ ,'java_params' => ['-Dhcat.bin=:HCATBIN:']
+ ,'pig' => q\#!/usr/bin/python
+from org.apache.pig.scripting import Pig
+
+#create pig script
+
+Pig.sql("""sql drop table if exists pig_script_hcat_ddl_1;""")
+ret = Pig.sql("""sql create table pig_script_hcat_ddl_1(name string,
+age int,
+gpa double)
+stored as textfile;
+""")
+
+if ret==0:
+ print "SQL command PASSED"
+
+else:
+ raise "SQL command FAILED"
+\
+ ,'rc' => 0
+ },
+ ]
+ },
]
}
;
Added: pig/branches/spark/test/e2e/pig/tests/join.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/join.conf?rev=1783988&view=auto
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/join.conf (added)
+++ pig/branches/spark/test/e2e/pig/tests/join.conf Wed Feb 22 09:43:41 2017
@@ -0,0 +1,310 @@
+#!/usr/bin/env perl
+############################################################################
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+###############################################################################
+
+$cfg = {
+ 'driver' => 'Pig',
+
+ 'groups' => [
+ {
+ 'name' => 'BloomJoin_Map',
+ 'execonly' => 'tez',
+ 'tests' => [
+ {
+ # Tuple join key
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+ },
+ {
+ # bytearray join key
+ 'num' => 2,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left outer join and chararray join key
+ 'num' => 3,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Right outer join
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a union
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Right input from a union and integer join key
+ 'num' => 6,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a split
+ 'num' => 7,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ },
+ {
+ # Right input from a split
+ 'num' => 8,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ },
+ ] # end of tests
+ },
+ {
+ 'name' => 'BloomJoin_Reduce',
+ 'execonly' => 'tez',
+ 'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],
+ 'tests' => [
+ {
+ # Tuple join key
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age) using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+--c = filter a by age < 20;
+--d = filter b by age < 20;
+e = join a by (name, age), b by (name, age);
+store e into ':OUTPATH:';\,
+ },
+ {
+ # bytearray join key
+ 'num' => 2,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name using 'bloom';
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c = filter a by age < 20;
+d = filter b by age < 20;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left outer join and chararray join key
+ 'num' => 3,
+ 'pig' => q\
+SET mapreduce.input.fileinputformat.split.maxsize '50000';
+SET pig.splitCombination false;
+a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name using 'bloom';
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age, registration, contributions);
+c = join a by name left, b by name;
+d = foreach c generate a::name, a::age, gpa, registration, contributions;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Right outer join
+ 'num' => 4,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age) using 'bloom';
+store c into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name:chararray, age:int, registration, contributions);
+c = join a by (name,age) right, b by (name,age);
+store c into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a union
+ 'num' => 5,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+d = filter d by age > 60;
+e = join c by name, d by name;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Right input from a union and integer join key
+ 'num' => 6,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age using 'bloom' PARALLEL 3;
+store e into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age:int, gpa);
+c = union a, b;
+c = filter c by age > 75;
+d = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join d by age, c by age;
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Left input from a split
+ 'num' => 7,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age using 'bloom';
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+b = filter b by age > 75;
+c = filter a by age > 50;
+d = join a by age, b by age;
+store c into ':OUTPATH:.1';
+store d into ':OUTPATH:.2';\,
+ },
+ {
+ # Right input from a split
+ 'num' => 8,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age using 'bloom';
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/voternulltab10k' as (name, age:int, registration, contributions);
+c = filter a by age > 75;
+d = filter a by name == 'nick miller';
+e = join b by age, c by age;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+ },
+ ] # end of tests
+ }
+ ] # end of groups
+};
\ No newline at end of file
Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Wed Feb 22 09:43:41 2017
@@ -728,6 +728,52 @@ b = union a1, a2;
c = rank b by name ASC, age DESC DENSE;
store c into ':OUTPATH:';\,
},
+ {
+ # Union + Split + Two replicate join
+ 'num' => 12,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+a1 = filter a by gpa is null or gpa <= 3.9;
+a2 = filter a by gpa < 2;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+c1 = filter c by age < 30;
+c2 = filter c by age > 50;
+d = join b by name, c1 by name using 'replicated';
+e = join d by b::name, c2 by name using 'replicated';
+store e into ':OUTPATH:';\,
+ },
+ {
+ # Multiple Union + Multiple Split + Single store
+ 'num' => 13,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa);
+u1 = union onschema a, b;
+SPLIT u1 INTO r IF age < 30, s OTHERWISE;
+c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions);
+d = JOIN r BY name LEFT, c BY votername;
+u2 = UNION ONSCHEMA d, s;
+e = FILTER u2 BY name == 'nick miller';
+f = FILTER u2 BY age > 70 ;
+u3 = UNION ONSCHEMA e, f;
+store u3 into ':OUTPATH:';\,
+ },
+ {
+ # PIG-5082. Similar to MultiQuery_Union_13 but for non-store vertex group
+ 'num' => 14,
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa);
+b = load ':INPATH:/singlefile/studenttab10k' as (name, age:int, gpa);
+u1 = union onschema a, b;
+SPLIT u1 INTO r IF age < 30, s OTHERWISE;
+c = load ':INPATH:/singlefile/voternulltab10k' as (votername, voterage, registration, contributions);
+d = JOIN r BY name LEFT, c BY votername;
+u2 = UNION ONSCHEMA d, s;
+e = FILTER u2 BY name == 'nick miller';
+f = FILTER u2 BY age > 70 ;
+u3 = UNION ONSCHEMA e, f;
+SPLIT u3 INTO t if age > 75, u OTHERWISE;
+v = JOIN t BY name LEFT, c BY votername;
+store v into ':OUTPATH:';\,
+ }
] # end of tests
},
@@ -860,7 +906,38 @@ m = UNION e, i, j, n;
n = JOIN a BY name, m BY name;
store n into ':OUTPATH:';\,
- }
+ },
+ {
+ # Self join bloom left outer
+ 'num' => 12,
+ 'execonly' => 'tez',
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name using 'bloom';
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name;
+store d into ':OUTPATH:';\,
+ },
+ {
+ # Self join bloom left outer with strategy as reduce
+ 'num' => 13,
+ 'execonly' => 'tez',
+ 'java_params' => ['-Dpig.bloomjoin.strategy=reduce'],
+ 'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name using 'bloom';
+store d into ':OUTPATH:';\,
+ 'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa > 3;
+d = join b by name left outer, c by name;
+store d into ':OUTPATH:';\,
+ },
] # end of tests
},
Modified: pig/branches/spark/test/e2e/pig/tests/negative.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/negative.conf?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/negative.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/negative.conf Wed Feb 22 09:43:41 2017
@@ -473,7 +473,7 @@ define CMD `perl PigStreaming.pl` input(
A = load ':INPATH:/singlefile/studenttab10k';
B = stream A through CMD;
store B into ':OUTPATH:';\,
- 'expected_err_regex' => "Error reading output from Streaming binary",
+ 'expected_err_regex' => "Error reading output from Streaming binary|Error while reading from POStream and passing it to the streaming process",
},
{
# Invalid serializer - throws exception
@@ -568,24 +568,7 @@ store D into ':OUTPATH:';\,
'expected_err_regex' => "Could not resolve StringStoreBad using imports",
},
]
- },
- {
- 'name' => 'LineageErrors',
- 'tests' => [
- {
- # UDF returns a bytearray that is cast to an integer
- 'num' => 1,
- 'pig' => q\register :FUNCPATH:/testudf.jar;
-a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-b = filter a by name lt 'b';
-c = foreach b generate org.apache.pig.test.udf.evalfunc.CreateMap((chararray)name, age);
-d = foreach c generate $0#'alice young';
-split d into e if $0 < 42, f if $0 >= 42;
-store e into ':OUTPATH:';\,
- 'expected_err_regex' => "Received a bytearray from the UDF or Union from two different Loaders. Cannot determine how to convert the bytearray to int",
- },
- ]
- }
+ }
]
}
;