You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2014/11/27 13:50:02 UTC
svn commit: r1642132 [8/14] - in /pig/branches/spark: ./ bin/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/datetime/convert/
contrib/piggybank/java/sr...
Modified: pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java (original)
+++ pig/branches/spark/src/org/apache/pig/newplan/logical/rules/ConstantCalculator.java Thu Nov 27 12:49:54 2014
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.PigContext;
@@ -48,6 +49,7 @@ import org.apache.pig.newplan.logical.op
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;
+import org.joda.time.DateTimeZone;
public abstract class ConstantCalculator extends Rule {
private List<LogicalRelationalOperator> processedOperators = new ArrayList<LogicalRelationalOperator>();
@@ -107,6 +109,7 @@ public abstract class ConstantCalculator
public static class ConstantCalculatorExpressionVisitor extends AllSameExpressionVisitor {
private LogicalRelationalOperator currentOp;
private PigContext pc;
+ private DateTimeZone currentDTZ = null;
public ConstantCalculatorExpressionVisitor(OperatorPlan expPlan,
LogicalRelationalOperator currentOp, PigContext pc) throws FrontendException {
super(expPlan, new ReverseDependencyOrderWalkerWOSeenChk(expPlan));
@@ -148,7 +151,11 @@ public abstract class ConstantCalculator
PhysicalOperator root = expPhysicalPlan.getLeaves().get(0);
try {
UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true));
+ PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
+ PhysicalOperator.setPigLogger(pigHadoopLogger);
+ setDefaultTimeZone();
val = root.getNext(root.getResultType()).result;
+ restoreDefaultTimeZone();
UDFContext.getUDFContext().addJobConf(null);
} catch (ExecException e) {
throw new FrontendException(e);
@@ -159,7 +166,9 @@ public abstract class ConstantCalculator
UserFuncExpression udf = (UserFuncExpression)op;
try {
UDFContext.getUDFContext().addJobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), true));
+ setDefaultTimeZone();
val = udf.getEvalFunc().exec(null);
+ restoreDefaultTimeZone();
UDFContext.getUDFContext().addJobConf(null);
} catch (IOException e) {
throw new FrontendException(e);
@@ -173,6 +182,21 @@ public abstract class ConstantCalculator
currentWalker.getPlan().replace(op, constantExpr);
}
}
+
+ private void setDefaultTimeZone() {
+ String dtzStr = pc.getProperties().getProperty("pig.datetime.default.tz");
+ if (dtzStr != null && dtzStr.length() > 0) {
+ currentDTZ = DateTimeZone.getDefault();
+ DateTimeZone.setDefault(DateTimeZone.forID(dtzStr));
+ }
+ }
+
+ private void restoreDefaultTimeZone() {
+ if (currentDTZ != null) {
+ DateTimeZone.setDefault(currentDTZ);
+ currentDTZ = null;
+ }
+ }
}
@Override
Modified: pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/AliasMasker.g Thu Nov 27 12:49:54 2014
@@ -227,7 +227,7 @@ bag_type
: ^( BAG_TYPE IDENTIFIER? tuple_type? )
;
-map_type : ^( MAP_TYPE type? )
+map_type : ^( MAP_TYPE IDENTIFIER? type? )
;
func_clause
Modified: pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/AstPrinter.g Thu Nov 27 12:49:54 2014
@@ -222,7 +222,7 @@ bag_type
: ^( BAG_TYPE { sb.append("bag{"); } ( { sb.append("T:"); } IDENTIFIER? tuple_type )? ) { sb.append("}"); }
;
-map_type : ^( MAP_TYPE { sb.append("map["); } type? ) { sb.append("]"); }
+map_type : ^( MAP_TYPE { sb.append("map["); } IDENTIFIER? type? ) { sb.append("]"); }
;
func_clause
Modified: pig/branches/spark/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/AstValidator.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/AstValidator.g Thu Nov 27 12:49:54 2014
@@ -276,7 +276,7 @@ tuple_type : ^( TUPLE_TYPE field_def_lis
bag_type : ^( BAG_TYPE IDENTIFIER? tuple_type? )
;
-map_type : ^( MAP_TYPE type? )
+map_type : ^( MAP_TYPE IDENTIFIER? type? )
;
func_clause : ^( FUNC_REF func_name )
Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Nov 27 12:49:54 2014
@@ -100,6 +100,8 @@ import org.apache.pig.newplan.logical.ru
import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
import org.apache.pig.newplan.logical.visitor.ResetProjectionAttachedRelationalOpVisitor;
+import org.apache.pig.validator.BlackAndWhitelistFilter;
+import org.apache.pig.validator.PigCommandFilter;
public class LogicalPlanBuilder {
@@ -123,6 +125,8 @@ public class LogicalPlanBuilder {
private int storeIndex = 0;
private int loadIndex = 0;
+ private final BlackAndWhitelistFilter filter;
+
private static NodeIdGenerator nodeIdGen = NodeIdGenerator.getGenerator();
public static long getNextId(String scope) {
@@ -135,6 +139,7 @@ public class LogicalPlanBuilder {
this.scope = scope;
this.fileNameMap = fileNameMap;
this.intStream = input;
+ this.filter = new BlackAndWhitelistFilter(this.pigContext);
}
LogicalPlanBuilder(IntStream input) throws ExecException {
@@ -143,6 +148,7 @@ public class LogicalPlanBuilder {
this.scope = "test";
this.fileNameMap = new HashMap<String, String>();
this.intStream = input;
+ this.filter = new BlackAndWhitelistFilter(this.pigContext);
}
Operator lookupOperator(String alias) {
@@ -158,10 +164,20 @@ public class LogicalPlanBuilder {
}
void defineCommand(String alias, StreamingCommand command) {
+ try {
+ filter.validate(PigCommandFilter.Command.DEFINE);
+ } catch (FrontendException e) {
+ throw new RuntimeException(e.getMessage());
+ }
pigContext.registerStreamCmd( alias, command );
}
void defineFunction(String alias, FuncSpec fs) {
+ try {
+ filter.validate(PigCommandFilter.Command.DEFINE);
+ } catch (FrontendException e) {
+ throw new RuntimeException(e);
+ }
pigContext.registerFunction( alias, fs );
}
Modified: pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Nov 27 12:49:54 2014
@@ -430,24 +430,19 @@ tuple_type returns[LogicalSchema logical
bag_type returns[LogicalSchema logicalSchema]
: ^( BAG_TYPE IDENTIFIER? tuple_type? )
{
- if ($tuple_type.logicalSchema!=null && $tuple_type.logicalSchema.size()==1 && $tuple_type.logicalSchema.getField(0).type==DataType.TUPLE) {
- $logicalSchema = $tuple_type.logicalSchema;
- }
- else {
- LogicalSchema s = new LogicalSchema();
- s.addField(new LogicalFieldSchema($IDENTIFIER.text, $tuple_type.logicalSchema, DataType.TUPLE));
- $logicalSchema = s;
- }
+ LogicalSchema s = new LogicalSchema();
+ s.addField(new LogicalFieldSchema($IDENTIFIER.text, $tuple_type.logicalSchema, DataType.TUPLE));
+ $logicalSchema = s;
}
;
map_type returns[LogicalSchema logicalSchema]
- : ^( MAP_TYPE type? )
+ : ^( MAP_TYPE IDENTIFIER? type? )
{
LogicalSchema s = null;
if( $type.datatype != null ) {
s = new LogicalSchema();
- s.addField( new LogicalFieldSchema( null, $type.logicalSchema, $type.datatype ) );
+ s.addField( new LogicalFieldSchema( $IDENTIFIER.text, $type.logicalSchema, $type.datatype ) );
}
$logicalSchema = s;
}
Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParser.g?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParser.g Thu Nov 27 12:49:54 2014
@@ -329,7 +329,7 @@ explicit_bag_type : BAG! implicit_bag_ty
explicit_bag_type_cast : BAG LEFT_CURLY explicit_tuple_type_cast? RIGHT_CURLY -> ^( BAG_TYPE_CAST explicit_tuple_type_cast? )
;
-implicit_map_type : LEFT_BRACKET type? RIGHT_BRACKET -> ^( MAP_TYPE type? )
+implicit_map_type : LEFT_BRACKET ( ( identifier_plus COLON )? type )? RIGHT_BRACKET -> ^( MAP_TYPE identifier_plus? type? )
;
explicit_map_type : MAP! implicit_map_type
Modified: pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java (original)
+++ pig/branches/spark/src/org/apache/pig/parser/QueryParserDriver.java Thu Nov 27 12:49:54 2014
@@ -47,12 +47,15 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
import org.apache.pig.impl.io.ResourceNotFoundException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.parser.QueryParser.literal_return;
import org.apache.pig.parser.QueryParser.schema_return;
import org.apache.pig.tools.pigstats.ScriptState;
+import org.apache.pig.validator.BlackAndWhitelistFilter;
+import org.apache.pig.validator.PigCommandFilter;
public class QueryParserDriver {
private static final Log LOG = LogFactory.getLog(QueryParserDriver.class);
@@ -367,8 +370,17 @@ public class QueryParserDriver {
private boolean expandImport(Tree ast) throws ParserException {
List<CommonTree> nodes = new ArrayList<CommonTree>();
traverseImport(ast, nodes);
- if (nodes.isEmpty()) return false;
+ if (nodes.isEmpty())
+ return false;
+ // Validate if imports are enabled/disabled
+ final BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(
+ this.pigContext);
+ try {
+ filter.validate(PigCommandFilter.Command.IMPORT);
+ } catch (FrontendException e) {
+ throw new ParserException(e.getMessage());
+ }
for (CommonTree t : nodes) {
macroImport(t);
}
@@ -562,6 +574,7 @@ public class QueryParserDriver {
String macroText = null;
try {
+ in.close();
in = new BufferedReader(new StringReader(sb.toString()));
macroText = pigContext.doParamSubstitution(in);
} catch (IOException e) {
Modified: pig/branches/spark/src/org/apache/pig/scripting/Pig.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/Pig.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/Pig.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/Pig.java Thu Nov 27 12:49:54 2014
@@ -120,7 +120,7 @@ public class Pig {
*/
public static void registerUDF(String udffile, String namespace)
throws IOException {
- LOG.info("Register script UFD file: "+ udffile);
+ LOG.info("Register script UDF file: "+ udffile);
ScriptPigContext ctx = getScriptContext();
ScriptEngine engine = ctx.getScriptEngine();
// script file contains only functions, no need to separate
@@ -349,13 +349,16 @@ public class Pig {
private static String getScriptFromFile(String filename) throws IOException {
LineNumberReader rd = new LineNumberReader(new FileReader(filename));
StringBuilder sb = new StringBuilder();
- String line = rd.readLine();
- while (line != null) {
- sb.append(line);
- sb.append("\n");
- line = rd.readLine();
+ try {
+ String line = rd.readLine();
+ while (line != null) {
+ sb.append(line);
+ sb.append("\n");
+ line = rd.readLine();
+ }
+ } finally {
+ rd.close();
}
- rd.close();
return sb.toString();
}
Modified: pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/ScriptingOutputCapturer.java Thu Nov 27 12:49:54 2014
@@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.ExecType;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.util.UDFContext;
import com.google.common.base.Charsets;
@@ -100,7 +101,7 @@ public class ScriptingOutputCapturer {
log.debug("TaskId: " + taskId);
log.debug("hadoopLogDir: " + hadoopLogDir);
- if (execType.isLocal()) {
+ if (execType.isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_FETCH, false)) {
String logDir = System.getProperty("pig.udf.scripting.log.dir");
if (logDir == null)
logDir = ".";
Modified: pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/jython/JythonScriptEngine.java Thu Nov 27 12:49:54 2014
@@ -300,7 +300,7 @@ public class JythonScriptEngine extends
// determine the relative path that the file should have in the jar
int pos = apath.lastIndexOf(File.separatorChar + name.replace('.', File.separatorChar));
if (pos > 0) {
- files.put(apath.substring(pos), apath);
+ files.put(apath.substring(pos + 1), apath);
} else {
files.put(apath, apath);
}
@@ -409,13 +409,15 @@ public class JythonScriptEngine extends
throw new IOException("Can't read file: " + scriptFile);
}
- // TODO: fis1 is not closed
FileInputStream fis1 = new FileInputStream(scriptFile);
- if (hasFunction(fis1)) {
- registerFunctions(scriptFile, null, pigContext);
+ try {
+ if (hasFunction(fis1)) {
+ registerFunctions(scriptFile, null, pigContext);
+ }
+ } finally {
+ fis1.close();
}
-
Interpreter.setMain(true);
FileInputStream fis = new FileInputStream(scriptFile);
try {
Modified: pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/scripting/streaming/python/PythonScriptEngine.java Thu Nov 27 12:49:54 2014
@@ -56,7 +56,12 @@ public class PythonScriptEngine extends
}
FileInputStream fin = new FileInputStream(f);
- List<String[]> functions = getFunctions(fin);
+ List<String[]> functions = null;
+ try {
+ functions = getFunctions(fin);
+ } finally {
+ fin.close();
+ }
namespace = namespace == null ? "" : namespace + NAMESPACE_SEPARATOR;
for(String[] functionInfo : functions) {
String name = functionInfo[0];
@@ -75,7 +80,6 @@ public class PythonScriptEngine extends
execType, isIllustrate
}));
}
- fin.close();
}
@Override
Modified: pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/grunt/GruntParser.java Thu Nov 27 12:49:54 2014
@@ -45,6 +45,7 @@ import java.util.Set;
import jline.ConsoleReader;
import jline.ConsoleReaderInputStream;
+import org.apache.commons.io.output.NullOutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -354,7 +355,7 @@ public class GruntParser extends PigScri
List<String> params, List<String> files,
boolean dontPrintOutput)
throws IOException, ParseException {
-
+ filter.validate(PigCommandFilter.Command.EXPLAIN);
if (null != mExplain) {
return;
}
@@ -390,26 +391,9 @@ public class GruntParser extends PigScri
explainCurrentBatch(false);
}
- /**
- * A {@link PrintStream} implementation which does not write anything
- * Used with '-check' command line option to pig Main
- * (through {@link GruntParser#explainCurrentBatch(boolean) } )
- */
- static class NullPrintStream extends PrintStream {
- public NullPrintStream(String fileName) throws FileNotFoundException {
- super(fileName);
- }
- @Override
- public void write(byte[] buf, int off, int len) {}
- @Override
- public void write(int b) {}
- @Override
- public void write(byte [] b) {}
- }
-
protected void explainCurrentBatch(boolean dontPrintOutput) throws IOException {
- PrintStream lp = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
- PrintStream ep = (dontPrintOutput) ? new NullPrintStream("dummy") : System.out;
+ PrintStream lp = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out;
+ PrintStream ep = (dontPrintOutput) ? new PrintStream(new NullOutputStream()) : System.out;
if (!(mExplain.mLast && mExplain.mCount == 0)) {
if (mPigServer.isBatchEmpty()) {
@@ -489,13 +473,20 @@ public class GruntParser extends PigScri
PigContext context = mPigServer.getPigContext();
BufferedReader reader = new BufferedReader(new FileReader(scriptPath));
- return context.doParamSubstitution(reader, params, paramFiles);
+ String result = context.doParamSubstitution(reader, params, paramFiles);
+ reader.close();
+ return result;
}
@Override
protected void processScript(String script, boolean batch,
List<String> params, List<String> files)
throws IOException, ParseException {
+ if(batch) {
+ filter.validate(PigCommandFilter.Command.EXEC);
+ } else {
+ filter.validate(PigCommandFilter.Command.RUN);
+ }
if(mExplain == null) { // process only if not in "explain" mode
if (script == null) {
@@ -1203,10 +1194,16 @@ public class GruntParser extends PigScri
}
public static int runSQLCommand(String hcatBin, String cmd, boolean mInteractive) throws IOException {
- String[] tokens = new String[3];
- tokens[0] = hcatBin;
- tokens[1] = "-e";
- tokens[2] = cmd.substring(cmd.indexOf("sql")).substring(4);
+ List<String> tokensList = new ArrayList<String>();
+ if (hcatBin.endsWith(".py")) {
+ tokensList.add("python");
+ tokensList.add(hcatBin);
+ } else {
+ tokensList.add(hcatBin);
+ }
+ tokensList.add("-e");
+ tokensList.add(cmd.substring(cmd.indexOf("sql")).substring(4).replaceAll("\n", " "));
+ String[] tokens = tokensList.toArray(new String[]{});
// create new environment = environment - HADOOP_CLASSPATH
// This is because of antlr version conflict between Pig and Hive
@@ -1218,7 +1215,7 @@ public class GruntParser extends PigScri
}
}
- log.info("Going to run hcat command: " + tokens[2]);
+ log.info("Going to run hcat command: " + tokens[tokens.length-1]);
Process executor = Runtime.getRuntime().exec(tokens, envSet.toArray(new String[0]));
StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out);
StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, System.err);
Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PigFileParser.jj Thu Nov 27 12:49:54 2014
@@ -287,7 +287,7 @@ void Parse() throws IOException : {}
void input() throws IOException :
{
String s;
- Token strTok;
+ Token strTok = null;
}
{
strTok = <PIG>
@@ -296,14 +296,16 @@ void input() throws IOException :
out.append(strTok.image );
}
|
- <DECLARE>
+ strTok = <DECLARE>
(
param_value(true) // overwrite=true
+ { pc.validate(strTok.toString()); }
)
|
- <PIGDEFAULT>
+ strTok = <PIGDEFAULT>
(
param_value(false) // overwrite=false
+ { pc.validate(strTok.toString()); }
)
|
s = paramString(){}
Modified: pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/parameters/PreprocessorContext.java Thu Nov 27 12:49:54 2014
@@ -36,6 +36,11 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.Shell;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.validator.BlackAndWhitelistFilter;
+import org.apache.pig.validator.PigCommandFilter;
+import org.python.google.common.base.Preconditions;
public class PreprocessorContext {
@@ -44,6 +49,8 @@ public class PreprocessorContext {
// used internally to detect when a param is set multiple times,
// but it set with the same value so it's ok not to log a warning
private Map<String, String> param_source;
+
+ private PigContext pigContext;
public Map<String, String> getParamVal() {
return param_val;
@@ -65,6 +72,10 @@ public class PreprocessorContext {
param_source = new Hashtable<String, String>(paramVal);
}
+ public void setPigContext(PigContext context) {
+ this.pigContext = context;
+ }
+
/*
public void processLiteral(String key, String val) {
processLiteral(key, val, true);
@@ -76,7 +87,7 @@ public class PreprocessorContext {
* @param key - parameter name
* @param val - string containing command to be executed
*/
- public void processShellCmd(String key, String val) throws ParameterSubstitutionException {
+ public void processShellCmd(String key, String val) throws ParameterSubstitutionException, FrontendException {
processShellCmd(key, val, true);
}
@@ -112,13 +123,18 @@ public class PreprocessorContext {
* @param key - parameter name
* @param val - string containing command to be executed
*/
- public void processShellCmd(String key, String val, Boolean overwrite) throws ParameterSubstitutionException {
+ public void processShellCmd(String key, String val, Boolean overwrite) throws ParameterSubstitutionException, FrontendException {
+ if (pigContext != null) {
+ BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext);
+ filter.validate(PigCommandFilter.Command.SH);
+ }
if (param_val.containsKey(key)) {
if (param_source.get(key).equals(val) || !overwrite) {
return;
} else {
- log.warn("Warning : Multiple values found for " + key + ". Using value " + val);
+ log.warn("Warning : Multiple values found for " + key
+ + ". Using value " + val);
}
}
@@ -130,6 +146,25 @@ public class PreprocessorContext {
param_val.put(key, sub_val);
}
+ public void validate(String preprocessorCmd) throws FrontendException {
+ if (pigContext == null) {
+ return;
+ }
+
+ final BlackAndWhitelistFilter filter = new BlackAndWhitelistFilter(pigContext);
+ final String declareToken = "%declare";
+ final String defaultToken = "%default";
+
+ if (preprocessorCmd.toLowerCase().equals(declareToken)) {
+ filter.validate(PigCommandFilter.Command.DECLARE);
+ } else if (preprocessorCmd.toLowerCase().equals(defaultToken)) {
+ filter.validate(PigCommandFilter.Command.DEFAULT);
+ } else {
+ throw new IllegalArgumentException("Pig Internal Error. Invalid preprocessor command specified : "
+ + preprocessorCmd);
+ }
+ }
+
/**
* This method generates value for the specified key by
* performing substitution if needed within the value first.
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/JobStats.java Thu Nov 27 12:49:54 2014
@@ -21,7 +21,6 @@ package org.apache.pig.tools.pigstats;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -182,17 +181,17 @@ public abstract class JobStats extends O
* @param durations
* @return median value
*/
- protected long calculateMedianValue(long[] durations) {
+ protected long calculateMedianValue(List<Long> durations) {
long median;
// figure out the median
- Arrays.sort(durations);
- int midPoint = durations.length /2;
- if ((durations.length & 1) == 1) {
+ Collections.sort(durations);
+ int midPoint = durations.size() /2;
+ if ((durations.size() & 1) == 1) {
// odd
- median = durations[midPoint];
+ median = durations.get(midPoint);
} else {
// even
- median = (durations[midPoint-1] + durations[midPoint]) / 2;
+ median = (durations.get(midPoint-1) + durations.get(midPoint)) / 2;
}
return median;
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigProgressNotificationListener.java Thu Nov 27 12:49:54 2014
@@ -18,12 +18,11 @@
package org.apache.pig.tools.pigstats;
+import org.apache.pig.PigRunner;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.impl.plan.OperatorPlan;
-import org.apache.pig.PigRunner;
-
/**
* Should be implemented by an object that wants to receive notifications
* from {@link PigRunner}.
@@ -33,7 +32,7 @@ import org.apache.pig.PigRunner;
public interface PigProgressNotificationListener extends java.util.EventListener {
/**
- * Invoked before any Hadoop jobs are run with the plan that is to be executed.
+ * Invoked before any Hadoop jobs (or a Tez DAG) are run with the plan that is to be executed.
*
* @param scriptId the unique id of the script
* @param plan the OperatorPlan that is to be executed
@@ -41,36 +40,36 @@ public interface PigProgressNotification
public void initialPlanNotification(String scriptId, OperatorPlan<?> plan);
/**
- * Invoked just before launching Hadoop jobs spawned by the script.
+ * Invoked just before launching Hadoop jobs (or tez DAGs) spawned by the script.
* @param scriptId the unique id of the script
- * @param numJobsToLaunch the total number of Hadoop jobs spawned by the script
+ * @param numJobsToLaunch the total number of Hadoop jobs (or Tez DAGs) spawned by the script
*/
public void launchStartedNotification(String scriptId, int numJobsToLaunch);
/**
- * Invoked just before submitting a batch of Hadoop jobs.
+ * Invoked just before submitting a batch of Hadoop jobs (or Tez DAGs).
* @param scriptId the unique id of the script
- * @param numJobsSubmitted the number of Hadoop jobs in the batch
+ * @param numJobsSubmitted the number of Hadoop jobs (or Tez DAGs) in the batch
*/
public void jobsSubmittedNotification(String scriptId, int numJobsSubmitted);
/**
- * Invoked after a Hadoop job is started.
- * @param scriptId the unique id of the script
- * @param assignedJobId the Hadoop job id
+ * Invoked after a Hadoop job (or Tez DAG) is started.
+ * @param scriptId the unique id of the script
+ * @param assignedJobId the Hadoop job id (or Tez DAG job id)
*/
public void jobStartedNotification(String scriptId, String assignedJobId);
/**
- * Invoked just after a Hadoop job is completed successfully.
- * @param scriptId the unique id of the script
- * @param jobStats the {@link JobStats} object associated with the Hadoop job
+ * Invoked just after a Hadoop job (or Tez DAG) is completed successfully.
+ * @param scriptId the unique id of the script
+ * @param jobStats the {@link JobStats} object associated with the Hadoop job (or Tez DAG)
*/
public void jobFinishedNotification(String scriptId, JobStats jobStats);
/**
* Invoked when a Hadoop job fails.
- * @param scriptId the unique id of the script
+ * @param scriptId the unique id of the script
* @param jobStats the {@link JobStats} object associated with the Hadoop job
*/
public void jobFailedNotification(String scriptId, JobStats jobStats);
@@ -83,16 +82,16 @@ public interface PigProgressNotification
public void outputCompletedNotification(String scriptId, OutputStats outputStats);
/**
- * Invoked to update the execution progress.
+ * Invoked to update the execution progress.
* @param scriptId the unique id of the script
* @param progress the percentage of the execution progress
*/
public void progressUpdatedNotification(String scriptId, int progress);
/**
- * Invoked just after all Hadoop jobs spawned by the script are completed.
+ * Invoked just after all Hadoop jobs (Tez DAGs) spawned by the script are completed.
* @param scriptId the unique id of the script
- * @param numJobsSucceeded the total number of Hadoop jobs succeeded
+ * @param numJobsSucceeded the total number of Hadoop jobs (Tez DAGs) succeeded
*/
public void launchCompletedNotification(String scriptId, int numJobsSucceeded);
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Thu Nov 27 12:49:54 2014
@@ -21,6 +21,8 @@ package org.apache.pig.tools.pigstats;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.util.Progressable;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.hadoop.executionengine.TaskContext;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -33,6 +35,15 @@ public class PigStatusReporter extends S
private TaskContext<?> context = null;
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PigStatusReporter.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ reporter = null;
+ }
+
private PigStatusReporter() {
}
@@ -46,10 +57,6 @@ public class PigStatusReporter extends S
return reporter;
}
- public void destroy() {
- context = null;
- }
-
public void setContext(TaskContext<?> context) {
this.context = context;
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Thu Nov 27 12:49:54 2014
@@ -296,7 +296,7 @@ public abstract class ScriptState {
// restrict the size of the script to be stored in job conf
int maxScriptSize = 10240;
if (pigContext != null) {
- String prop = pigContext.getProperties().getProperty(PigConfiguration.MAX_SCRIPT_SIZE);
+ String prop = pigContext.getProperties().getProperty(PigConfiguration.PIG_SCRIPT_MAX_SIZE);
if (prop != null) {
maxScriptSize = Integer.valueOf(prop);
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Thu Nov 27 12:49:54 2014
@@ -31,9 +31,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
@@ -70,7 +68,7 @@ public final class MRJobStats extends Jo
}
public static final String SUCCESS_HEADER = "JobId\tMaps\tReduces\t" +
- "MaxMapTime\tMinMapTIme\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
+ "MaxMapTime\tMinMapTime\tAvgMapTime\tMedianMapTime\tMaxReduceTime\t" +
"MinReduceTime\tAvgReduceTime\tMedianReducetime\tAlias\tFeature\tOutputs";
public static final String FAILURE_HEADER = "JobId\tAlias\tFeature\tMessage\tOutputs";
@@ -349,13 +347,13 @@ public final class MRJobStats extends Jo
}
void addMapReduceStatistics(Job job) {
- TaskReport[] maps = null;
+ Iterator<TaskReport> maps = null;
try {
maps = HadoopShims.getTaskReports(job, TaskType.MAP);
} catch (IOException e) {
LOG.warn("Failed to get map task report", e);
}
- TaskReport[] reduces = null;
+ Iterator<TaskReport> reduces = null;
try {
reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
} catch (IOException e) {
@@ -364,21 +362,22 @@ public final class MRJobStats extends Jo
addMapReduceStatistics(maps, reduces);
}
- private TaskStat getTaskStat(TaskReport[] tasks) {
- int size = tasks.length;
+ private TaskStat getTaskStat(Iterator<TaskReport> tasks) {
+ int size = 0;
long max = 0;
long min = Long.MAX_VALUE;
long median = 0;
long total = 0;
- long durations[] = new long[size];
+ List<Long> durations = new ArrayList<Long>();
- for (int i = 0; i < tasks.length; i++) {
- TaskReport rpt = tasks[i];
+ while(tasks.hasNext()){
+ TaskReport rpt = tasks.next();
long duration = rpt.getFinishTime() - rpt.getStartTime();
- durations[i] = duration;
+ durations.add(duration);
max = (duration > max) ? duration : max;
min = (duration < min) ? duration : min;
total += duration;
+ size++;
}
long avg = total / size;
@@ -387,8 +386,8 @@ public final class MRJobStats extends Jo
return new TaskStat(size, max, min, avg, median);
}
- private void addMapReduceStatistics(TaskReport[] maps, TaskReport[] reduces) {
- if (maps != null && maps.length > 0) {
+ private void addMapReduceStatistics(Iterator<TaskReport> maps, Iterator<TaskReport> reduces) {
+ if (maps != null && maps.hasNext()) {
TaskStat st = getTaskStat(maps);
setMapStat(st.size, st.max, st.min, st.avg, st.median);
} else {
@@ -398,7 +397,7 @@ public final class MRJobStats extends Jo
}
}
- if (reduces != null && reduces.length > 0) {
+ if (reduces != null && reduces.hasNext()) {
TaskStat st = getTaskStat(reduces);
setReduceStat(st.size, st.max, st.min, st.avg, st.median);
} else {
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Thu Nov 27 12:49:54 2014
@@ -20,9 +20,10 @@ package org.apache.pig.tools.pigstats.te
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,15 +32,21 @@ import org.apache.pig.LoadFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
-import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.newplan.Operator;
import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.PigProgressNotificationListener;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.apache.pig.tools.pigstats.ScriptState;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -52,9 +59,8 @@ import com.google.common.collect.Maps;
public class TezScriptState extends ScriptState {
private static final Log LOG = LogFactory.getLog(TezScriptState.class);
- private Map<TezOperator, String> featureMap = null;
- private Map<TezOperator, String> aliasMap = Maps.newHashMap();
- private Map<TezOperator, String> aliasLocationMap = Maps.newHashMap();
+ private List<PigTezProgressNotificationListener> tezListeners = Lists.newArrayList();
+ private Map<String, TezDAGScriptInfo> dagScriptInfo = Maps.newHashMap();
public TezScriptState(String id) {
super(id);
@@ -64,13 +70,48 @@ public class TezScriptState extends Scri
return (TezScriptState) ScriptState.get();
}
- public void addSettingsToConf(TezOperator tezOp, Configuration conf) {
+ @Override
+ public void registerListener(PigProgressNotificationListener listener) {
+ super.registerListener(listener);
+ if (listener instanceof PigTezProgressNotificationListener) {
+ tezListeners.add((PigTezProgressNotificationListener) listener);
+ }
+ }
+
+ public void dagLaunchNotification(String dagName, OperatorPlan<?> dagPlan, int numVerticesToLaunch) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagLaunchNotification(id, dagName, dagPlan, numVerticesToLaunch);
+ }
+ }
+
+ public void dagStartedNotification(String dagName, String assignedApplicationId) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagStartedNotification(id, dagName, assignedApplicationId);
+ }
+ }
+
+ public void dagProgressNotification(String dagName, int numVerticesCompleted, int progress) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagProgressNotification(id, dagName, numVerticesCompleted, progress);
+ }
+ }
+
+ public void dagCompletedNotification(String dagName, TezDAGStats tezDAGStats) {
+ for (PigTezProgressNotificationListener listener: tezListeners) {
+ listener.dagCompletedNotification(id, dagName, tezDAGStats.isSuccessful(), tezDAGStats);
+ }
+ }
+
+ public void addDAGSettingsToConf(Configuration conf) {
LOG.info("Pig script settings are added to the job");
conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
+ }
+
+ public void addVertexSettingsToConf(String dagName, TezOperator tezOp, Configuration conf) {
try {
List<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
@@ -95,8 +136,8 @@ public class TezScriptState extends Scri
LOG.warn("unable to get the map loads", e);
}
- setPigFeature(tezOp, conf);
- setJobParents(tezOp, conf);
+ setPigFeature(dagName, tezOp, conf);
+ setJobParents(dagName, tezOp, conf);
conf.set("mapreduce.workflow.id", "pig_" + id);
conf.set("mapreduce.workflow.name", getFileName().isEmpty() ? "default" : getFileName());
@@ -116,32 +157,30 @@ public class TezScriptState extends Scri
}
}
- private void setPigFeature(TezOperator tezOp, Configuration conf) {
- conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), getPigFeature(tezOp));
+ private void setPigFeature(String dagName, TezOperator tezOp, Configuration conf) {
+ if (tezOp.isVertexGroup()) {
+ return;
+ }
+ TezDAGScriptInfo dagInfo = getDAGScriptInfo(dagName);
+ conf.set(PIG_PROPERTY.JOB_FEATURE.toString(), dagInfo.getPigFeatures(tezOp));
if (scriptFeatures != 0) {
conf.set(PIG_PROPERTY.SCRIPT_FEATURES.toString(),
String.valueOf(scriptFeatures));
}
- conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), getAlias(tezOp));
- conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), getAliasLocation(tezOp));
+ conf.set(PIG_PROPERTY.JOB_ALIAS.toString(), dagInfo.getAlias(tezOp));
+ conf.set(PIG_PROPERTY.JOB_ALIAS_LOCATION.toString(), dagInfo.getAliasLocation(tezOp));
}
- private void setJobParents(TezOperator tezOp, Configuration conf) {
+ private void setJobParents(String dagName, TezOperator tezOp, Configuration conf) {
+ if (tezOp.isVertexGroup()) {
+ return;
+ }
// PigStats maintains a job DAG with the job id being updated
// upon available. Therefore, before a job is submitted, the ids
// of its parent jobs are already available.
- JobGraph jg = PigStats.get().getJobGraph();
- JobStats js = null;
- Iterator<JobStats> iter = jg.iterator();
- while (iter.hasNext()) {
- JobStats job = iter.next();
- if (job.getName().equals(tezOp.getOperatorKey().toString())) {
- js = job;
- break;
- }
- }
+ JobStats js = ((TezPigScriptStats)PigStats.get()).getVertexStats(dagName, tezOp.getOperatorKey().toString());
if (js != null) {
- List<Operator> preds = jg.getPredecessors(js);
+ List<Operator> preds = js.getPlan().getPredecessors(js);
if (preds != null) {
StringBuilder sb = new StringBuilder();
for (Operator op : preds) {
@@ -154,87 +193,173 @@ public class TezScriptState extends Scri
}
}
- public String getAlias(TezOperator tezOp) {
- if (!aliasMap.containsKey(tezOp)) {
- setAlias(tezOp);
- }
- return aliasMap.get(tezOp);
+ public TezDAGScriptInfo setDAGScriptInfo(TezPlanContainerNode tezPlanNode) {
+ TezDAGScriptInfo info = new TezDAGScriptInfo(tezPlanNode.getTezOperPlan());
+ dagScriptInfo.put(tezPlanNode.getOperatorKey().toString(), info);
+ return info;
}
- private void setAlias(TezOperator tezOp) {
- ArrayList<String> alias = new ArrayList<String>();
- String aliasLocationStr = "";
- try {
- ArrayList<String> aliasLocation = new ArrayList<String>();
- new AliasVisitor(tezOp.plan, alias, aliasLocation).visit();
- aliasLocationStr += LoadFunc.join(aliasLocation, ",");
- if (!alias.isEmpty()) {
- Collections.sort(alias);
- }
- } catch (VisitorException e) {
- LOG.warn("unable to get alias", e);
- }
- aliasMap.put(tezOp, LoadFunc.join(alias, ","));
- aliasLocationMap.put(tezOp, aliasLocationStr);
+ public TezDAGScriptInfo getDAGScriptInfo(String dagName) {
+ return dagScriptInfo.get(dagName);
}
- public String getAliasLocation(TezOperator tezOp) {
- if (!aliasLocationMap.containsKey(tezOp)) {
- setAlias(tezOp);
- }
- return aliasLocationMap.get(tezOp);
- }
+ static class TezDAGScriptInfo {
- public String getPigFeature(TezOperator tezOp) {
- if (featureMap == null) {
- featureMap = Maps.newHashMap();
- }
+ private static final Log LOG = LogFactory.getLog(TezDAGScriptInfo.class);
+ private TezOperPlan tezPlan;
+ private String alias;
+ private String aliasLocation;
+ private String features;
+
+ private Map<OperatorKey, String> featuresMap = Maps.newHashMap();
+ private Map<OperatorKey, String> aliasMap = Maps.newHashMap();
+ private Map<OperatorKey, String> aliasLocationMap = Maps.newHashMap();
+
+ class DAGAliasVisitor extends TezOpPlanVisitor {
+
+ private Set<String> aliases;
+ private Set<String> aliasLocations;
+ private BitSet featureSet;
+
+ public DAGAliasVisitor(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ this.aliases = new HashSet<String>();
+ this.aliasLocations = new HashSet<String>();
+ this.featureSet = new BitSet();
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOp) throws VisitorException {
+ if (tezOp.isVertexGroup()) {
+ featureSet.set(PIG_FEATURE.UNION.ordinal());
+ return;
+ }
+ ArrayList<String> aliasList = new ArrayList<String>();
+ String aliasLocationStr = "";
+ try {
+ ArrayList<String> aliasLocationList = new ArrayList<String>();
+ new AliasVisitor(tezOp.plan, aliasList, aliasLocationList).visit();
+ aliasLocationStr += LoadFunc.join(aliasLocationList, ",");
+ if (!aliasList.isEmpty()) {
+ Collections.sort(aliasList);
+ aliases.addAll(aliasList);
+ aliasLocations.addAll(aliasLocationList);
+ }
+ } catch (VisitorException e) {
+ LOG.warn("unable to get alias", e);
+ }
+ aliasMap.put(tezOp.getOperatorKey(), LoadFunc.join(aliasList, ","));
+ aliasLocationMap.put(tezOp.getOperatorKey(), aliasLocationStr);
- String retStr = featureMap.get(tezOp);
- if (retStr == null) {
- BitSet feature = new BitSet();
- feature.clear();
- if (tezOp.isSkewedJoin()) {
- feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
- }
- if (tezOp.isGlobalSort()) {
- feature.set(PIG_FEATURE.ORDER_BY.ordinal());
- }
- if (tezOp.isSampler()) {
- feature.set(PIG_FEATURE.SAMPLER.ordinal());
- }
- if (tezOp.isIndexer()) {
- feature.set(PIG_FEATURE.INDEXER.ordinal());
- }
- if (tezOp.isCogroup()) {
- feature.set(PIG_FEATURE.COGROUP.ordinal());
- }
- if (tezOp.isGroupBy()) {
- feature.set(PIG_FEATURE.GROUP_BY.ordinal());
- }
- if (tezOp.isRegularJoin()) {
- feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
- }
- if (tezOp.isUnion()) {
- feature.set(PIG_FEATURE.UNION.ordinal());
+
+ BitSet feature = new BitSet();
+ feature.clear();
+ if (tezOp.isSkewedJoin()) {
+ feature.set(PIG_FEATURE.SKEWED_JOIN.ordinal());
+ }
+ if (tezOp.isGlobalSort()) {
+ feature.set(PIG_FEATURE.ORDER_BY.ordinal());
+ }
+ if (tezOp.isSampler()) {
+ feature.set(PIG_FEATURE.SAMPLER.ordinal());
+ }
+ if (tezOp.isIndexer()) {
+ feature.set(PIG_FEATURE.INDEXER.ordinal());
+ }
+ if (tezOp.isCogroup()) {
+ feature.set(PIG_FEATURE.COGROUP.ordinal());
+ }
+ if (tezOp.isGroupBy()) {
+ feature.set(PIG_FEATURE.GROUP_BY.ordinal());
+ }
+ if (tezOp.isRegularJoin()) {
+ feature.set(PIG_FEATURE.HASH_JOIN.ordinal());
+ }
+ if (tezOp.isUnion()) {
+ feature.set(PIG_FEATURE.UNION.ordinal());
+ }
+ if (tezOp.isNative()) {
+ feature.set(PIG_FEATURE.NATIVE.ordinal());
+ }
+ if (tezOp.isLimit() || tezOp.isLimitAfterSort()) {
+ feature.set(PIG_FEATURE.LIMIT.ordinal());
+ }
+ try {
+ new FeatureVisitor(tezOp.plan, feature).visit();
+ } catch (VisitorException e) {
+ LOG.warn("Feature visitor failed", e);
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
+ if (sb.length() > 0) sb.append(",");
+ sb.append(PIG_FEATURE.values()[i].name());
+ }
+ featuresMap.put(tezOp.getOperatorKey(), sb.toString());
+ for (int i=0; i < feature.length(); i++) {
+ if (feature.get(i)) {
+ featureSet.set(i);
+ }
+ }
}
- if (tezOp.isNative()) {
- feature.set(PIG_FEATURE.NATIVE.ordinal());
+
+ @Override
+ public void visit() throws VisitorException {
+ super.visit();
+ if (!aliases.isEmpty()) {
+ ArrayList<String> aliasList = new ArrayList<String>(aliases);
+ ArrayList<String> aliasLocationList = new ArrayList<String>(aliasLocations);
+ Collections.sort(aliasList);
+ Collections.sort(aliasLocationList);
+ alias = LoadFunc.join(aliasList, ",");
+ aliasLocation = LoadFunc.join(aliasLocationList, ",");
+ }
+ StringBuilder sb = new StringBuilder();
+ for (int i = featureSet.nextSetBit(0); i >= 0; i = featureSet.nextSetBit(i+1)) {
+ if (sb.length() > 0) sb.append(",");
+ sb.append(PIG_FEATURE.values()[i].name());
+ }
+ features = sb.toString();
}
+
+ }
+
+ public TezDAGScriptInfo(TezOperPlan tezPlan) {
+ this.tezPlan = tezPlan;
+ initialize();
+ }
+
+ private void initialize() {
try {
- new FeatureVisitor(tezOp.plan, feature).visit();
+ new DAGAliasVisitor(tezPlan).visit();
} catch (VisitorException e) {
- LOG.warn("Feature visitor failed", e);
+ LOG.warn("Cannot calculate alias information for DAG", e);
}
- StringBuilder sb = new StringBuilder();
- for (int i=feature.nextSetBit(0); i>=0; i=feature.nextSetBit(i+1)) {
- if (sb.length() > 0) sb.append(",");
- sb.append(PIG_FEATURE.values()[i].name());
- }
- retStr = sb.toString();
- featureMap.put(tezOp, retStr);
}
- return retStr;
+
+ public String getAlias() {
+ return alias;
+ }
+
+ public String getAliasLocation() {
+ return aliasLocation;
+ }
+
+ public String getPigFeatures() {
+ return features;
+ }
+
+ public String getAlias(TezOperator tezOp) {
+ return aliasMap.get(tezOp.getOperatorKey());
+ }
+
+ public String getAliasLocation(TezOperator tezOp) {
+ return aliasLocationMap.get(tezOp.getOperatorKey());
+ }
+
+ public String getPigFeatures(TezOperator tezOp) {
+ return featuresMap.get(tezOp.getOperatorKey());
+ }
+
}
}
Modified: pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java (original)
+++ pig/branches/spark/src/org/apache/pig/validator/BlackAndWhitelistFilter.java Thu Nov 27 12:49:54 2014
@@ -35,12 +35,17 @@ public final class BlackAndWhitelistFilt
private static final Splitter SPLITTER = Splitter.on(',').trimResults()
.omitEmptyStrings();
- private final PigServer pigServer;
+ private final PigContext context;
private final Set<String> whitelist;
private final Set<String> blacklist;
public BlackAndWhitelistFilter(PigServer pigServer) {
- this.pigServer = pigServer;
+ this(pigServer.getPigContext());
+ }
+
+ public BlackAndWhitelistFilter(PigContext context) {
+ this.context = context;
+
whitelist = Sets.newHashSet();
blacklist = Sets.newHashSet();
@@ -48,7 +53,6 @@ public final class BlackAndWhitelistFilt
}
private void init() {
- PigContext context = pigServer.getPigContext();
String whitelistConfig = context.getProperties().getProperty(PigConfiguration.PIG_WHITELIST);
if (whitelistConfig != null) {
Modified: pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java (original)
+++ pig/branches/spark/src/org/apache/pig/validator/PigCommandFilter.java Thu Nov 27 12:49:54 2014
@@ -34,7 +34,7 @@ import org.apache.pig.impl.logicalLayer.
public interface PigCommandFilter {
public enum Command {
- FS, LS, SH, MAPREDUCE, REGISTER, SET, CAT, CD, DUMP, KILL, PWD, MV, CP, COPYTOLOCAL, COPYFROMLOCAL, MKDIR, RM, RMF, ILLUSTRATE
+ DEFINE, DECLARE, DEFAULT, EXPLAIN, EXEC, FS, IMPORT, LS, SH, MAPREDUCE, REGISTER, SET, CAT, CD, DUMP, KILL, PWD, MV, CP, COPYTOLOCAL, COPYFROMLOCAL, MKDIR, RM, RMF, RUN, ILLUSTRATE
}
/**
Modified: pig/branches/spark/src/python/streaming/controller.py
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/python/streaming/controller.py?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/src/python/streaming/controller.py (original)
+++ pig/branches/spark/src/python/streaming/controller.py Thu Nov 27 12:49:54 2014
@@ -96,7 +96,7 @@ class PythonStreamingController:
if is_illustrate or udf_logging.udf_log_level != logging.DEBUG:
#Only log output for illustrate after we get the flag to capture output.
- sys.stdout = open("/dev/null", 'w')
+ sys.stdout = open(os.devnull, 'w')
else:
sys.stdout = self.output_stream
@@ -157,7 +157,14 @@ class PythonStreamingController:
input_str = input_stream.readline()
while input_str.endswith(END_RECORD_DELIM) == False:
- input_str += input_stream.readline()
+ line = input_stream.readline()
+ if line == '':
+ input_str = ''
+ break
+ input_str += line
+
+ if input_str == '':
+ return END_OF_STREAM
if input_str == TURN_ON_OUTPUT_CAPTURING:
logging.debug("Turned on Output Capturing")
@@ -297,6 +304,12 @@ def _deserialize_collection(input_str, r
else:
return list_result
+def wrap_tuple(o, serialized_item):
+ if type(o) != tuple:
+ return WRAPPED_TUPLE_START + serialized_item + WRAPPED_TUPLE_END
+ else:
+ return serialized_item
+
def serialize_output(output, utfEncodeAllFields=False):
"""
@param utfEncodeStrings - Generally we want to utf encode only strings. But for
@@ -314,7 +327,7 @@ def serialize_output(output, utfEncodeAl
WRAPPED_TUPLE_END)
elif output_type == list:
return (WRAPPED_BAG_START +
- WRAPPED_FIELD_DELIMITER.join([serialize_output(o, utfEncodeAllFields) for o in output]) +
+ WRAPPED_FIELD_DELIMITER.join([wrap_tuple(o, serialize_output(o, utfEncodeAllFields)) for o in output]) +
WRAPPED_BAG_END)
elif output_type == dict:
return (WRAPPED_MAP_START +
Modified: pig/branches/spark/test/e2e/harness/TestDriver.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/harness/TestDriver.pm?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/harness/TestDriver.pm (original)
+++ pig/branches/spark/test/e2e/harness/TestDriver.pm Thu Nov 27 12:49:54 2014
@@ -717,7 +717,7 @@ sub runTestGroup() {
$testStatuses->{$testName} = $failedStr;
}
- $msg .= "\nEnding test $testName at " . time ."\n";
+ $msg .= " at " . time . "\nEnding test $testName at " . time ."\n";
print $subLog $msg;
$duration = $endTime - $beginTime;
$dbinfo{'duration'} = $duration;
Modified: pig/branches/spark/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/build.xml (original)
+++ pig/branches/spark/test/e2e/pig/build.xml Thu Nov 27 12:49:54 2014
@@ -18,15 +18,14 @@
<project name="TestHarnessPigTests" default="test">
<property name="pig.dir" value="${basedir}/../../.."/>
- <property name="ivy.dir" location="${pig.dir}/ivy" />
+ <property name="pig.base.dir" value="${basedir}/../../.."/>
+ <property name="ivy.dir" location="${pig.base.dir}/ivy" />
<loadproperties srcfile="${ivy.dir}/libraries.properties"/>
- <property name="jython.jar"
- value="${pig.dir}/build/ivy/lib/Pig/jython-standalone-${jython.version}.jar"/>
- <property name="jruby.jar"
- value="${pig.dir}/build/ivy/lib/Pig/jruby-complete-${jruby.version}.jar"/>
+ <property name="piggybank.jar"
+ value="${pig.base.dir}/contrib/piggybank/java/piggybank.jar"/>
<property name="hive.lib.dir"
- value="${pig.dir}/build/ivy/lib/Pig"/>
+ value="${pig.base.dir}/build/ivy/lib/Pig"/>
<condition property="hive.hadoop.shims.version" value="0.23" else="0.20S">
<equals arg1="${hadoopversion}" arg2="23" />
@@ -216,23 +215,14 @@
<!-- Check that the necessary properties are setup -->
<target name="property-check">
- <fail message="Please set the property harness.cluster.conf to the conf directory of your hadoop installation or harness.hadoop.home to HADOOP_HOME for your Hadoop installation.">
+ <fail message="Please set the property harness.cluster.conf to the conf directory of your hadoop installation,harness.hadoop.home to HADOOP_HOME for your Hadoop installation and harness.cluster.bin to the binary executable of your hadoop installation.">
<condition>
<not>
- <or>
+ <and>
<isset property="harness.cluster.conf"/>
<isset property="harness.hadoop.home"/>
- </or>
- </not>
- </condition>
- </fail>
- <fail message="Please set the property harness.cluster.bin to the binary executable of your hadoop installation or harness.hadoop.home to HADOOP_HOME for your Hadoop installation.">
- <condition>
- <not>
- <or>
<isset property="harness.cluster.bin"/>
- <isset property="harness.hadoop.home"/>
- </or>
+ </and>
</not>
</condition>
</fail>
@@ -279,6 +269,7 @@
<env key="HADOOP_CONF_DIR" value="${harness.cluster.conf}"/>
<env key="PIG_USE_PYTHON" value="${harness.use.python}"/>
<env key="PH_CLUSTER_BIN" value="${harness.cluster.bin}"/>
+ <env key="PH_PIGGYBANK_JAR" value="${piggybank.jar}"/>
<env key="PH_HIVE_LIB_DIR" value="${hive.lib.dir}"/>
<env key="PH_HIVE_VERSION" value="${hive.version}"/>
<env key="PH_HIVE_SHIMS_VERSION" value="${hive.hadoop.shims.version}"/>
Modified: pig/branches/spark/test/e2e/pig/conf/default.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/default.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/default.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/default.conf Thu Nov 27 12:49:54 2014
@@ -49,7 +49,7 @@ $cfg = {
, 'testconfigpath' => "$ENV{HADOOP_CONF_DIR}"
, 'funcjarPath' => "$ENV{PH_ROOT}/lib/java"
, 'paramPath' => "$ENV{PH_ROOT}/paramfiles"
- , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java"
+ , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}"
, 'pigpath' => "$ENV{PH_PIG}"
, 'oldpigpath' => "$ENV{PH_OLDPIG}"
, 'hcatbin' => "$ENV{HCAT_BIN}"
Modified: pig/branches/spark/test/e2e/pig/conf/local.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/local.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/local.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/local.conf Thu Nov 27 12:49:54 2014
@@ -15,7 +15,7 @@
# limitations under the License.
my $me = `whoami`;
-chomp $me;
+$me =~ s/[^a-zA-Z0-9]*//g;
# The contents of this file can be rewritten to fit your installation.
# Also, you can define the following environment variables and set things up as in the test setup
@@ -40,13 +40,13 @@ $cfg = {
#TEST
, 'benchmarkPath' => "$ENV{PH_OUT}/benchmarks"
, 'scriptPath' => "$ENV{PH_ROOT}/libexec"
- , 'tmpPath' => '/tmp/pigtest'
+ , 'tmpPath' => 'tmp/pigtest'
#PIG
, 'testconfigpath' => "$ENV{PH_CLUSTER}/conf/"
, 'funcjarPath' => "$ENV{PH_ROOT}/lib/java"
, 'paramPath' => "$ENV{PH_ROOT}/paramfiles"
- , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java"
+ , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}"
, 'pigpath' => "$ENV{PH_PIG}"
, 'oldpigpath' => "$ENV{PH_OLDPIG}"
, 'exectype' => 'local'
Modified: pig/branches/spark/test/e2e/pig/conf/rpm.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/rpm.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/rpm.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/rpm.conf Thu Nov 27 12:49:54 2014
@@ -15,7 +15,7 @@
# limitations under the License.
my $me = `whoami`;
-chomp $me;
+$me =~ s/[^a-zA-Z0-9]*//g;
# The contents of this file can be rewritten to fit your installation.
# Also, you can define the following environment variables and set things up as in the test setup
@@ -43,14 +43,13 @@ $cfg = {
#TEST
, 'benchmarkPath' => "$ENV{PH_OUT}/benchmarks"
, 'scriptPath' => "$ENV{PH_ROOT}/libexec"
- , 'tmpPath' => '/tmp/pigtest'
+ , 'tmpPath' => 'tmp/pigtest'
#PIG
, 'testconfigpath' => "$ENV{HADOOP_CONF_DIR}"
, 'funcjarPath' => "$ENV{PH_ROOT}/lib/java"
- , 'piggybankjarPath' => "/usr/lib/pig"
+ , 'piggybankjarPath' => "/usr/lib/pig/lib/piggybank.jar"
, 'paramPath' => "$ENV{PH_ROOT}/paramfiles"
- , 'piggybankjarPath' => "/usr/lib/pig"
, 'pigpath' => "/usr"
, 'oldpigpath' => "$ENV{PH_OLDPIG}"
, 'hcatbin' => "$ENV{HCAT_BIN}"
Modified: pig/branches/spark/test/e2e/pig/conf/tez.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/conf/tez.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/conf/tez.conf (original)
+++ pig/branches/spark/test/e2e/pig/conf/tez.conf Thu Nov 27 12:49:54 2014
@@ -15,7 +15,7 @@
# limitations under the License.
my $me = `whoami`;
-chomp $me;
+$me =~ s/[^a-zA-Z0-9]*//g;
# The contents of this file can be rewritten to fit your installation.
# Also, you can define the following environment variables and set things up as in the test setup
@@ -43,13 +43,13 @@ $cfg = {
#TEST
, 'benchmarkPath' => "$ENV{PH_OUT}/benchmarks"
, 'scriptPath' => "$ENV{PH_ROOT}/libexec"
- , 'tmpPath' => '/tmp/pigtest'
+ , 'tmpPath' => 'tmp/pigtest'
#PIG
, 'testconfigpath' => "$ENV{HADOOP_CONF_DIR}"
, 'funcjarPath' => "$ENV{PH_ROOT}/lib/java"
, 'paramPath' => "$ENV{PH_ROOT}/paramfiles"
- , 'piggybankjarPath' => "$ENV{PH_PIG}/contrib/piggybank/java"
+ , 'piggybankjarPath' => "$ENV{PH_PIGGYBANK_JAR}"
, 'pigpath' => "$ENV{PH_PIG}"
, 'oldpigpath' => "$ENV{PH_OLDPIG}"
, 'hcatbin' => "$ENV{HCAT_BIN}"
Modified: pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm (original)
+++ pig/branches/spark/test/e2e/pig/deployers/LocalDeployer.pm Thu Nov 27 12:49:54 2014
@@ -120,11 +120,6 @@ sub generateData
'rows' => 10000,
'outfile' => "singlefile/studenttab10k",
}, {
- 'name' => "studenttab20m",
- 'filetype' => "studenttab",
- 'rows' => 20000000,
- 'outfile' => "singlefile/studenttab20m",
- }, {
'name' => "votertab10k",
'filetype' => "votertab",
'rows' => 10000,
@@ -210,11 +205,6 @@ sub generateData
'rows' => 5000,
'outfile' => "types/numbers.txt",
}, {
- 'name' => "biggish",
- 'filetype' => "biggish",
- 'rows' => 1000000,
- 'outfile' => "singlefile/biggish",
- }, {
'name' => "prerank",
'filetype' => "ranking",
'rows' => 30,
Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Thu Nov 27 12:49:54 2014
@@ -73,7 +73,7 @@ sub replaceParameters
$cmd =~ s/:INPATH:/$testCmd->{'inpathbase'}/g;
$cmd =~ s/:OUTPATH:/$outfile/g;
$cmd =~ s/:FUNCPATH:/$testCmd->{'funcjarPath'}/g;
- $cmd =~ s/:PIGGYBANKPATH:/$testCmd->{'piggybankjarPath'}/g;
+ $cmd =~ s/:PIGGYBANKJAR:/$testCmd->{'piggybankjarPath'}/g;
$cmd =~ s/:PIGPATH:/$testCmd->{'pigpath'}/g;
$cmd =~ s/:RUNID:/$testCmd->{'UID'}/g;
$cmd =~ s/:USRHOMEPATH:/$testCmd->{'userhomePath'}/g;
@@ -672,6 +672,9 @@ sub generateBenchmark
if ((Util::isWindows()||Util::isCygwin()) && $testCmd->{'pig_win'}) {
$modifiedTestCmd{'pig'} = $testCmd->{'pig_win'};
}
+ if ( $testCmd->{'hadoopversion'} == '23' && $testCmd->{'pig23'}) {
+ $modifiedTestCmd{'pig'} = $testCmd->{'pig23'};
+ }
# Change so we're looking at the old version of Pig
if (defined $testCmd->{'oldpigpath'} && $testCmd->{'oldpigpath'} ne "") {
$modifiedTestCmd{'pigpath'} = $testCmd->{'oldpigpath'};
Modified: pig/branches/spark/test/e2e/pig/tests/bigdata.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/bigdata.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/bigdata.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/bigdata.conf Thu Nov 27 12:49:54 2014
@@ -28,11 +28,12 @@ $cfg = {
'groups' => [
{
- 'name' => 'BigData',
+ 'name' => 'BigData_Checkin',
'tests' => [
{
'num' => 1,
,'floatpostprocess' => 1
+ ,'java_params' => ['-Dpig.tez.auto.parallelism=false']
,'delimiter' => ' ',
'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
@@ -48,18 +49,6 @@ store i into ':OUTPATH:';\,
{
'num' => 2,
'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
-a1 = filter a by age < '50';
-b = group a1 by (name, age);
-c = foreach b generate group as g, AVG(a1.gpa);
-d = filter c by $1 > 3.0;
-d1 = foreach d generate g.$0 as name, g.$1 as age, $1 as gpa;
-e = group d1 by name;
-f = foreach e generate group, AVG(d1.age);
-store f into ':OUTPATH:';\,
- },
- {
- 'num' => 3,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
c = foreach a generate name;
d = foreach b generate name;
@@ -68,57 +57,84 @@ f = union d, e;
g = distinct f parallel 20;
store g into ':OUTPATH:';\,
},
- {
- 'num' => 4,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+ ]
+ },
+ {
+ 'name' => 'BigData_Group',
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+a1 = filter a by age < '50';
+b = group a1 by (name, age);
+c = foreach b generate group as g, AVG(a1.gpa);
+d = filter c by $1 > 3.0;
+d1 = foreach d generate g.$0 as name, g.$1 as age, $1 as gpa;
+e = group d1 by name;
+f = foreach e generate group, AVG(d1.age);
+store f into ':OUTPATH:';\,
+ },
+ {
+ 'num' => 2,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
b = group a all parallel 20;
c = foreach b generate COUNT(a.$0);
store c into ':OUTPATH:';\,
- },
- {
- 'num' => 5,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+ },
+ {
+ 'num' => 3,
+ 'java_params' => ['-Dpig.exec.mapPartAgg=true'],
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
b = group a by name parallel 20;
c = foreach b generate group, COUNT($1);
store c into ':OUTPATH:';\,
- },
- {
- 'num' => 6,
- 'pig' => q\
+ },
+ ]
+ },
+ {
+ 'name' => 'BigData_Stream',
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => q\
define cmd `perl PigStreaming.pl` ship(':SCRIPTHOMEPATH:/PigStreaming.pl') stderr('CMD' limit 3);
a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
b = stream a through cmd as (n, a, g);
c = foreach b generate n, a;
store c into ':OUTPATH:';\,
- },
- {
- 'num' => 7,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+ },
+ ]
+ },
+ {
+ 'name' => 'BigData_Order',
+ 'tests' => [
+ {
+ 'num' => 1,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
b = order a by name parallel 20;
store b into ':OUTPATH:';\,
- 'sortArgs' => ['-t', ' ', '-k', '1,1'],
- },
- {
- 'num' => 8,
- 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
+ 'sortArgs' => ['-t', ' ', '-k', '1,1'],
+ },
+ {
+ 'num' => 2,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name:chararray, age:int, gpa:double);
b = order a by name, age desc parallel 20;
store b into ':OUTPATH:';\,
- 'sortArgs' => ['-t', ' ', '-k', '1,1', '-k', '2nr,2nr'],
- },
- {
- 'num' => 9,
- 'pig' => q\A = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
+ 'sortArgs' => ['-t', ' ', '-k', '1,1', '-k', '2nr,2nr'],
+ },
+ {
+ 'num' => 3,
+ 'pig' => q\A = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
B = filter A by age > 20;
C = group B by name;
D = foreach C generate group, COUNT(B) PARALLEL 16;
E = order D by $0 PARALLEL 16;
F = limit E 10;
store F into ':OUTPATH:';\,
- 'sortArgs' => ['-t', ' ', '-k', '1,1'],
- },
-
- ]
- },
+ 'sortArgs' => ['-t', ' ', '-k', '1,1'],
+ },
+ ]
+ },
]
}
;
Modified: pig/branches/spark/test/e2e/pig/tests/grunt.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/grunt.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/grunt.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/grunt.conf Thu Nov 27 12:49:54 2014
@@ -44,7 +44,7 @@ $cfg = {
'num' => 2,
'pig' => "pwd",
'execonly' => 'mapred,tez', # don't have a clue what their cwd will be for local mode
- 'expected_out_regex' => "hdfs:",
+ 'expected_out_regex' => "/user",
'rc' => 0
},{
@@ -55,10 +55,8 @@ $cfg = {
},{
'num' => 6,
- 'pig' => q\
-sh touch /bin/bad
-\,
- ,'expected_err_regex' => "Permission denied"
+ 'pig' => "cat nonexist"
+ ,'expected_err_regex' => "does not exist"
,'rc' => 5
},{
Modified: pig/branches/spark/test/e2e/pig/tests/macro.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/macro.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/macro.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/macro.conf Thu Nov 27 12:49:54 2014
@@ -373,10 +373,16 @@ $cfg = {
A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
x = test(A);
store x into ':OUTPATH:';#,
+ 'pig_win' => q#define CMD `perl -ne "print $_;"`;
+ define test(in) returns B {
+ $B = stream $in through CMD as (name, age, gpa);
+ }
+
+ A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+ x = test(A);
+ store x into ':OUTPATH:';#,
'verify_pig_script' => q#A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
- define CMD `perl -ne 'print $_;'`;
- B = stream A through CMD as (name, age, gpa);
- store B into ':OUTPATH:';#,
+ store A into ':OUTPATH:';#,
'floatpostprocess' => 1,
'delimiter' => ' '
},
Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1642132&r1=1642131&r2=1642132&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Thu Nov 27 12:49:54 2014
@@ -236,6 +236,14 @@ $cfg = {
C = stream B through CMD2 as (name, age, gpa);
D = JOIN B by name, C by name;
store D into ':OUTPATH:.2'; #,
+ 'pig_win' => q# define CMD1 `perl -ne "print $_;"`;
+ define CMD2 `perl -ne "print $_;"`;
+ A = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+ B = stream A through CMD1 as (name, age, gpa);
+ store B into ':OUTPATH:.1';
+ C = stream B through CMD2 as (name, age, gpa);
+ D = JOIN B by name, C by name;
+ store D into ':OUTPATH:.2'; #,
'sql' => "select name, age, gpa from studenttab10k;
select A.name, A.age, A.gpa, B.name, B.age, B.gpa
from studenttab10k as A join studenttab10k as B using(name);",