You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/27 17:54:22 UTC
svn commit: r641888 - in /incubator/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/
src/org/apache/pig/backend/local/executionengine/ src/org/apache/p...
Author: olga
Date: Thu Mar 27 09:54:10 2008
New Revision: 641888
URL: http://svn.apache.org/viewvc?rev=641888&view=rev
Log:
PIG-94: M2 - I/0, ship/cache, error handling
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/PigServer.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java
incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java
incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Mar 27 09:54:10 2008
@@ -181,3 +181,6 @@
PIG-154: moving parsing for DEFINE and STORE into QueryParser
PIG-100: improved error handling
+
+ PIG-94: changes for M2 of streaming: input/ouptut/ ship/cache error
+ handling
Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Thu Mar 27 09:54:10 2008
@@ -54,6 +54,7 @@
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.apache.pig.impl.logicalLayer.parser.QueryParser;
import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.WrappedIOException;
@@ -147,6 +148,16 @@
}
/**
+ * Add a path to be skipped while automatically shipping binaries for
+ * streaming.
+ *
+ * @param path path to be skipped
+ */
+ public void addPathToSkip(String path) {
+ pigContext.addPathToSkip(path);
+ }
+
+ /**
* Defines an alias for the given function spec. This
* is useful for functions that require arguments to the
* constructor.
@@ -157,6 +168,16 @@
*/
public void registerFunction(String function, String functionSpec) {
pigContext.registerFunction(function, functionSpec);
+ }
+
+ /**
+ * Defines an alias for the given streaming command.
+ *
+ * @param commandAlias - the new command alias to define
+ * @param command - streaming command to be executed
+ */
+ public void registerStreamingCommand(String commandAlias, StreamingCommand command) {
+ pigContext.registerStreamCmd(commandAlias, command);
}
private URL locateJarFromResources(String jarName) throws IOException {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Mar 27 09:54:10 2008
@@ -112,6 +112,9 @@
}
public void init() throws ExecException {
+ // Copy over necessary configuration from PigContext
+ updateConfiguration(pigContext.getConf());
+
//First set the ssh socket factory
setSSHFactory();
@@ -164,6 +167,7 @@
}
try {
+ // Set job-specific configuration knobs
jobClient = new JobClient(new JobConf(conf.getConfiguration()));
}
catch (IOException e) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Thu Mar 27 09:54:10 2008
@@ -21,6 +21,7 @@
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,6 +56,7 @@
public Class<WritableComparator> userComparator = null;
public String quantilesFile = null;
public PigContext pigContext;
+ public Properties properties = new Properties();
public OperatorKey sourceLogicalKey;
@@ -244,6 +246,8 @@
toMap.set(i, spec);
else
toMap.set(i, toMap.get(i).addSpec(spec));
+
+ properties.putAll(spec.getProperties());
}
public void addReduceSpec(EvalSpec spec){
@@ -251,6 +255,8 @@
toReduce = spec;
else
toReduce = toReduce.addSpec(spec);
+
+ properties.putAll(spec.getProperties());
}
public void visit(POVisitor v) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Mar 27 09:54:10 2008
@@ -19,13 +19,22 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import java.util.Random;
+import java.util.Map.Entry;
+import org.apache.commons.httpclient.URIException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
@@ -67,6 +76,8 @@
public static Configuration config = null;
public static HExecutionEngine execEngine = null;
+ public static final String LOG_DIR = "_logs";
+
public static void setConf(Configuration configuration) {
config = configuration;
}
@@ -116,6 +127,7 @@
*/
public boolean launchPig(POMapreduce pom) throws IOException {
JobConf conf = new JobConf(config);
+ setJobProperties(conf, pom);
conf.setJobName(pom.pigContext.getJobName());
boolean success = false;
List<String> funcs = new ArrayList<String>();
@@ -132,11 +144,24 @@
funcs.addAll(pom.toReduce.getFuncs());
}
+ String shipFiles =
+ pom.properties.getProperty("pig.streaming.ship.files");
+ List<String> files = new ArrayList<String>();
+ if (shipFiles != null) {
+ String[] paths = shipFiles.split(",");
+ for (String path : paths) {
+ path = path.trim();
+ if (path.length() > 0) {
+ files.add(path.trim());
+ }
+ }
+ }
+
// create jobs.jar locally and pass it to hadoop
File submitJarFile = File.createTempFile("Job", ".jar");
try {
FileOutputStream fos = new FileOutputStream(submitJarFile);
- JarManager.createJar(fos, funcs, pom.pigContext);
+ JarManager.createJar(fos, funcs, files, pom.pigContext);
log.debug("Job jar size = " + submitJarFile.length());
conf.setJar(submitJarFile.getPath());
String user = System.getProperty("user.name");
@@ -193,6 +218,41 @@
conf.setOutputPath(new Path(pom.outputFileSpec.getFileName()));
conf.set("pig.storeFunc", pom.outputFileSpec.getFuncSpec());
+ // Setup the DistributedCache for this job
+ DistributedCache.createSymlink(conf);
+
+ String cacheFiles =
+ pom.properties.getProperty("pig.streaming.cache.files");
+ if (cacheFiles != null) {
+ String[] paths = cacheFiles.split(",");
+
+ for (String path : paths) {
+ path = path.trim();
+ if (path.length() != 0) {
+ URI uri = null;
+ try {
+ uri = new URI(path);
+ } catch (URISyntaxException ue) {
+ throw new IOException("Invalid cache specification, file doesn't exist: " + path);
+ }
+ DistributedCache.addCacheFile(uri, conf);
+ }
+ }
+ }
+
+ //TODO - Remove this
+ conf.setBoolean("keep.failed.task.files", true);
+
+ // Setup the logs directory for this job
+ String jobOutputFileName = pom.pigContext.getJobOutputFile();
+ if (jobOutputFileName != null && jobOutputFileName.length() > 0) {
+ Path jobOutputFile = new Path(pom.pigContext.getJobOutputFile());
+ conf.set("pig.output.dir",
+ jobOutputFile.getParent().toString());
+ conf.set("pig.streaming.log.dir",
+ new Path(jobOutputFile, LOG_DIR).toString());
+ }
+
//
// Now, actually submit the job (using the submit name)
//
@@ -316,6 +376,7 @@
}
catch (Exception e) {
// Do we need different handling for different exceptions
+ e.printStackTrace();
throw WrappedIOException.wrap(e);
}
finally {
@@ -324,6 +385,20 @@
return success;
}
+ /**
+ * Copy job-specific configuration from the <code>Properties</code>
+ * of the given <code>POMapreduce</code>.
+ *
+ * @param job job configuration
+ * @param pom <code>POMapreduce</code> to be executed
+ */
+ private static void setJobProperties(JobConf job, POMapreduce pom)
+ throws IOException {
+ for (Map.Entry property : pom.properties.entrySet()) {
+ job.set((String)property.getKey(), (String)property.getValue());
+ }
+ }
+
private void getErrorMessages(TaskReport reports[], String type)
{
for (int i = 0; i < reports.length; i++) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu Mar 27 09:54:10 2008
@@ -65,7 +65,7 @@
String evalSpec = job.get("pig.combineFunc", "");
EvalSpec esp = (EvalSpec)ObjectSerializer.deserialize(evalSpec);
if(esp != null) esp.instantiateFunc(pigContext);
- evalPipe = esp.setupPipe(finalout);
+ evalPipe = esp.setupPipe(null, finalout);
//throw new RuntimeException("combine spec: " + evalSpec + " combine pipe: " + esp.toString());
bags = new DataBag[inputCount];
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigInputFormat.java Thu Mar 27 09:54:10 2008
@@ -187,7 +187,11 @@
}
myReader.set(this);
loader.bindTo(split.getPath().toString(), new BufferedPositionedInputStream(is, start), start, end);
-
+
+ // Mimic FileSplit
+ job.set("map.input.file", split.getPath().toString());
+ job.setLong("map.input.start", split.getStart());
+ job.setLong("map.input.length", split.getLength());
}
public JobConf getJobConf(){
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Thu Mar 27 09:54:10 2008
@@ -23,6 +23,7 @@
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -84,6 +85,7 @@
public static Reporter reporter = null;
JobConf job;
+ private Properties properties = new Properties();
private DataCollector evalPipe;
private OutputCollector oc;
private EvalSpec group;
@@ -104,7 +106,7 @@
oc = output;
- setupMapPipe(reporter);
+ setupMapPipe(properties, reporter);
// allocate key & value instances that are re-used for all entries
WritableComparable key = input.createKey();
@@ -127,7 +129,7 @@
try {
oc = output;
if (evalPipe == null) {
- setupReducePipe();
+ setupReducePipe(properties);
}
DataBag[] bags = new DataBag[inputCount];
@@ -158,11 +160,29 @@
}
}
- /**
- * Just save off the PigJobConf for later use.
- */
- public void configure(JobConf job) {
- this.job = job;
+ public void configure(JobConf jobConf) {
+ job = jobConf;
+
+ // Set up the pigContext
+ try {
+ pigContext =
+ (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+ } catch (IOException ioe) {
+ log.fatal("Failed to deserialize PigContext with: " + ioe);
+ throw new RuntimeException(ioe);
+ }
+ pigContext.setJobConf(job);
+
+ // Get properties from the JobConf and save it in the
+ // <code>properties</code> so that it can be used in the Eval pipeline
+ properties.setProperty("pig.output.dir",
+ jobConf.get("pig.output.dir", ""));
+ properties.setProperty("pig.streaming.log.dir",
+ jobConf.get("pig.streaming.log.dir", "_logs"));
+ properties.setProperty("pig.streaming.task.id",
+ jobConf.get("mapred.task.id"));
+ properties.setProperty("pig.streaming.task.output.dir",
+ jobConf.getOutputPath().toString());
}
/**
@@ -191,9 +211,9 @@
return taskidParts[taskidParts.length - 2];
}
- private void setupMapPipe(Reporter reporter) throws IOException {
+ private void setupMapPipe(Properties properties, Reporter reporter)
+ throws IOException {
PigSplit split = PigInputFormat.PigRecordReader.getPigRecordReader().getPigFileSplit();
- pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
index = split.getIndex();
EvalSpec evalSpec = split.getEvalSpec();
@@ -216,11 +236,13 @@
}else{
oc = getSplitCollector(splitSpec);
}
- evalPipe = evalSpec.setupPipe(new MapDataOutputCollector());
+ evalPipe = evalSpec.setupPipe(properties,
+ new MapDataOutputCollector());
} else {
group = groupSpec;
- DataCollector groupInput = group.setupPipe(new MapDataOutputCollector());
- evalPipe = evalSpec.setupPipe(groupInput);
+ DataCollector groupInput =
+ group.setupPipe(properties, new MapDataOutputCollector());
+ evalPipe = evalSpec.setupPipe(properties, groupInput);
}
}
@@ -248,8 +270,7 @@
};
}
- private void setupReducePipe() throws IOException {
- pigContext = (PigContext)ObjectSerializer.deserialize(job.get("pig.pigContext"));
+ private void setupReducePipe(Properties properties) throws IOException {
EvalSpec evalSpec = (EvalSpec)ObjectSerializer.deserialize(job.get("pig.reduceFunc", ""));
if (evalSpec == null)
@@ -271,7 +292,8 @@
oc = getSplitCollector(splitSpec);
}
- evalPipe = evalSpec.setupPipe(new ReduceDataOutputCollector());
+ evalPipe = evalSpec.setupPipe(properties,
+ new ReduceDataOutputCollector());
inputCount = ((ArrayList<FileSpec>)ObjectSerializer.deserialize(job.get("pig.inputs"))).size();
Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POCogroup.java Thu Mar 27 09:54:10 2008
@@ -79,7 +79,8 @@
}
};
- DataCollector inputToSpec = specs.get(i).setupPipe(outputFromSpec);
+ DataCollector inputToSpec = specs.get(i).setupPipe(null,
+ outputFromSpec);
Tuple t;
while ((t = (Tuple) ((PhysicalOperator)opTable.get(inputs[i])).getNext()) != null) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/executionengine/POEval.java Thu Mar 27 09:54:10 2008
@@ -74,7 +74,7 @@
if (buf==null)
buf = new DataBuffer();
if (evalPipeline == null)
- evalPipeline = spec.setupPipe(buf);
+ evalPipeline = spec.setupPipe(null, buf);
inputDrained = false;
Modified: incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/PigContext.java Thu Mar 27 09:54:10 2008
@@ -39,6 +39,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.Main;
import org.apache.pig.PigServer.ExecType;
import org.apache.pig.backend.datastorage.DataStorage;
@@ -50,8 +51,11 @@
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
+import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.WrappedIOException;
@@ -91,15 +95,30 @@
private String jobName = JOB_NAME_PREFIX; // can be overwritten by users
+ // Pig Script Output
+ private String jobOutputFile = "";
+
+ // JobConf of the currently executing Map-Reduce job
+ JobConf jobConf;
+
/**
* a table mapping function names to function specs.
*/
private Map<String, String> definedFunctions = new HashMap<String, String>();
+ /**
+ * a table mapping names to streaming commands.
+ */
+ private Map<String, StreamingCommand> definedCommands =
+ new HashMap<String, StreamingCommand>();
+
private static ArrayList<String> packageImportList = new ArrayList<String>();
public boolean debug = true;
+ // List of paths skipped for automatic shipping
+ List<String> skippedShipPaths = new ArrayList<String>();
+
public PigContext() {
this(ExecType.MAPREDUCE);
}
@@ -118,6 +137,14 @@
}
executionEngine = null;
+
+ // Add the default paths to be skipped for auto-shipping of commands
+ skippedShipPaths.add("/bin");
+ skippedShipPaths.add("/usr/bin");
+ skippedShipPaths.add("/usr/local/bin");
+ skippedShipPaths.add("/sbin");
+ skippedShipPaths.add("/usr/sbin");
+ skippedShipPaths.add("/usr/local/sbin");
}
static{
@@ -341,6 +368,22 @@
}
/**
+ * Defines an alias for the given streaming command.
+ *
+ * This is useful for complicated streaming command specs.
+ *
+ * @param alias - the new command alias to define.
+ * @param command - the command
+ */
+ public void registerStreamCmd(String alias, StreamingCommand command) {
+ if (command == null) {
+ definedCommands.remove(alias);
+ } else {
+ definedCommands.put(alias, command);
+ }
+ }
+
+ /**
* Returns the type of execution currently in effect.
*
* @return
@@ -489,7 +532,103 @@
return instantiateFuncFromSpec(alias);
}
+ /**
+ * Get the {@link StreamingCommand} for the given alias.
+ *
+ * @param alias the alias for the <code>StreamingCommand</code>
+ * @return <code>StreamingCommand</code> for the alias
+ */
+ public StreamingCommand getCommandForAlias(String alias) {
+ return definedCommands.get(alias);
+ }
+
public void setExecType(ExecType execType) {
this.execType = execType;
+ }
+
+ /**
+ * Create a new {@link ExecutableManager} depending on the ExecType.
+ *
+ * @return a new {@link ExecutableManager} depending on the ExecType
+ * @throws ExecException
+ */
+ public ExecutableManager createExecutableManager() throws ExecException {
+ ExecutableManager executableManager = null;
+
+ switch (execType) {
+ case LOCAL:
+ {
+ executableManager = new ExecutableManager();
+ }
+ break;
+ case MAPREDUCE:
+ {
+ executableManager = new HadoopExecutableManager();
+ }
+ break;
+ default:
+ {
+ throw new ExecException("Unkown execType: " + execType);
+ }
+ }
+
+ return executableManager;
+ }
+
+ /**
+ * Get the output file for the current Pig Script.
+ *
+ * @return the output file for the current Pig Script
+ */
+ public String getJobOutputFile() {
+ return jobOutputFile;
+ }
+
+ /**
+ * Set the output file for the current Pig Script.
+ *
+ * @param jobOutputFile the output file for the current Pig Script
+ */
+ public void setJobOutputFile(String jobOutputFile) {
+ this.jobOutputFile = jobOutputFile;
+ }
+
+ /**
+ * Get the <code>JobConf</code> of the current Map-Reduce job.
+ *
+ * @return the <code>JobConf</code> of the current Map-Reduce job
+ */
+ public JobConf getJobConf() {
+ return jobConf;
+ }
+
+ /**
+ * Set the <code>JobConf</code> of the current Map-Reduce job.
+ *
+ * @param jobConf the <code>JobConf</code> of the current Map-Reduce job
+ */
+ public void setJobConf(JobConf jobConf) {
+ this.jobConf = jobConf;
+ }
+
+ /**
+ * Add a path to be skipped while automatically shipping binaries for
+ * streaming.
+ *
+ * @param path path to be skipped
+ */
+ public void addPathToSkip(String path) {
+ skippedShipPaths.add(path);
+ }
+
+ /**
+ * Get paths which are to skipped while automatically shipping binaries for
+ * streaming.
+ *
+ * @return paths which are to skipped while automatically shipping binaries
+ * for streaming
+ */
+ public List<String> getPathsToSkip() {
+ return skippedShipPaths;
}
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/BinCondSpec.java Thu Mar 27 09:54:10 2008
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.data.Datum;
import org.apache.pig.impl.FunctionInstantiator;
@@ -68,7 +69,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe){
@Override
public void add(Datum d) {
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/CompositeEvalSpec.java Thu Mar 27 09:54:10 2008
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.impl.FunctionInstantiator;
import org.apache.pig.impl.eval.collector.DataCollector;
@@ -43,9 +44,10 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe){
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe){
for (int i=specs.size()-1; i>=0; i--){
- endOfPipe = specs.get(i).setupDefaultPipe(endOfPipe);
+ endOfPipe = specs.get(i).setupDefaultPipe(properties, endOfPipe);
}
return endOfPipe;
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Thu Mar 27 09:54:10 2008
@@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Comparator;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.ComparisonFunc;
import org.apache.pig.data.Datum;
@@ -45,13 +46,15 @@
private String comparatorFuncName;
private transient Comparator<Tuple> comparator;
+ protected Properties properties = new Properties();
+
/*
* Keep a precomputed pipeline ready if we do simple evals
* No separate code path for simple evals as earlier
*/
private void init(){
simpleEvalOutput = new DataBuffer();
- simpleEvalInput = setupPipe(simpleEvalOutput);
+ simpleEvalInput = setupPipe(properties, simpleEvalOutput);
}
public class UserComparator implements Comparator<Tuple> {
@@ -88,20 +91,24 @@
/**
* set up a default data processing pipe for processing by this spec
* This pipe does not include unflattening/flattening at the end
+ * @param properties properties for the pipe
* @param endOfPipe The collector where output is desired
* @return The collector where input tuples should be put
*/
- protected abstract DataCollector setupDefaultPipe(DataCollector endOfPipe);
+ protected abstract DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe);
/**
* set up a data processing pipe with flattening/unflattening at the end
* based on the isFlattened field
*
+ * @param properties properties for the <code>EvalSpec</code>
* @param endOfPipe where the output is desired
* @return The collector where input tuples should be put
*/
- public DataCollector setupPipe(DataCollector endOfPipe){
+ public DataCollector setupPipe(Properties properties,
+ DataCollector endOfPipe){
/*
* By default tuples flow through the eval pipeline in a flattened fashion
* Thus if flatten is true, we use the default setup pipe method, otherwise we add
@@ -110,10 +117,10 @@
if (isFlattened){
FlattenCollector fc = new FlattenCollector(endOfPipe);
- return setupDefaultPipe(fc);
+ return setupDefaultPipe(properties, fc);
}else{
UnflattenCollector uc = new UnflattenCollector(endOfPipe);
- return setupDefaultPipe(uc);
+ return setupDefaultPipe(properties, uc);
}
}
@@ -245,6 +252,15 @@
this.inner = inner;
}
+ /**
+ * Get properties specific to a given <code>EvalSpec</code>.
+ *
+ * @return properties specific to a given <code>EvalSpec</code>
+ */
+ public Properties getProperties() {
+ return properties;
+ }
+
public abstract void visit(EvalSpecVisitor v);
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FilterSpec.java Thu Mar 27 09:54:10 2008
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.data.Datum;
import org.apache.pig.impl.FunctionInstantiator;
@@ -48,7 +49,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe) {
@Override
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/FuncEvalSpec.java Thu Mar 27 09:54:10 2008
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
+import java.util.Properties;
import org.apache.pig.EvalFunc;
import org.apache.pig.Algebraic;
@@ -81,7 +82,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe){
private Datum getPlaceHolderForFuncOutput(){
Type returnType = func.getReturnType();
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/GenerateSpec.java Thu Mar 27 09:54:10 2008
@@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -63,7 +64,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe){
LinkedList<CrossProductItem> pendingCrossProducts = new LinkedList<CrossProductItem>();
@@ -154,7 +156,7 @@
continue;
toBeCrossed[i] = new DatumBag();
- specs.get(i).setupPipe(toBeCrossed[i]).add(cpiInput);
+ specs.get(i).setupPipe(properties, toBeCrossed[i]).add(cpiInput);
}
}
@@ -258,7 +260,7 @@
}
public void exec(){
- specs.get(driver).setupPipe(this).add(cpiInput);
+ specs.get(driver).setupPipe(properties, this).add(cpiInput);
//log.error(Thread.currentThread().getName() + ": Executing driver on " + cpiInput);
successor.markStale(false);
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SimpleEvalSpec.java Thu Mar 27 09:54:10 2008
@@ -18,6 +18,8 @@
package org.apache.pig.impl.eval;
+import java.util.Properties;
+
import org.apache.pig.data.Datum;
import org.apache.pig.impl.eval.collector.DataCollector;
@@ -25,7 +27,8 @@
public abstract class SimpleEvalSpec extends EvalSpec {
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe){
@Override
public void add(Datum d) {
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/SortDistinctSpec.java Thu Mar 27 09:54:10 2008
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -56,7 +57,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe){
@Override
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StarSpec.java Thu Mar 27 09:54:10 2008
@@ -19,6 +19,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.impl.eval.collector.DataCollector;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -38,7 +39,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return endOfPipe;
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Thu Mar 27 09:54:10 2008
@@ -19,16 +19,16 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.impl.streaming.PigExecutableManager;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.Datum;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.data.Datum;
import org.apache.pig.impl.eval.collector.DataCollector;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -38,21 +38,35 @@
private static final Log LOG =
LogFactory.getLog(StreamSpec.class.getName());
- private String streamingCommand; // Actual command to be run
- private String deserializer; // LoadFunc to be used
- private String serializer; // StoreFunc to be used
-
- public StreamSpec(String streamingCommand) {
- this.streamingCommand = streamingCommand;
- this.deserializer = PigStorage.class.getName();
- this.serializer = PigStorage.class.getName();
- }
+ private String executableManager; // ExecutableManager to use
+ private StreamingCommand command; // Actual command to be run
- public StreamSpec(String streamingCommand,
- String deserializer, String serializer) {
- this.streamingCommand = streamingCommand;
- this.deserializer = deserializer;
- this.serializer = serializer;
+ public StreamSpec(ExecutableManager executableManager,
+ StreamingCommand command) {
+ this.executableManager = executableManager.getClass().getName();
+ this.command = command;
+
+ // Setup streaming-specific properties
+ if (command.getShipFiles()) {
+ parseShipCacheSpecs(command.getShipSpecs(),
+ properties, "pig.streaming.ship.files");
+ }
+ parseShipCacheSpecs(command.getCacheSpecs(),
+ properties, "pig.streaming.cache.files");
+ }
+
+ private static void parseShipCacheSpecs(List<String> specs,
+ Properties properties, String property) {
+ // Setup streaming-specific properties
+ StringBuffer sb = new StringBuffer();
+ Iterator<String> i = specs.iterator();
+ while (i.hasNext()) {
+ sb.append(i.next());
+ if (i.hasNext()) {
+ sb.append(", ");
+ }
+ }
+ properties.setProperty(property, sb.toString());
}
@Override
@@ -66,15 +80,11 @@
return null;
}
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
- return new StreamDataCollector(streamingCommand,
- (deserializer == null) ? new PigStorage() :
- (LoadFunc)PigContext.instantiateFuncFromSpec(
- deserializer),
- (serializer == null) ? new PigStorage() :
- (StoreFunc)PigContext.instantiateFuncFromSpec(
- serializer),
- endOfPipe);
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
+ return new StreamDataCollector(properties,
+ (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManager),
+ command, endOfPipe);
}
public void visit(EvalSpecVisitor v) {
@@ -82,16 +92,18 @@
}
/**
- * A simple {@link DataCollector} which wraps a {@link PigExecutableManager}
+ * A simple {@link DataCollector} which wraps a {@link ExecutableManager}
* and lets it handle the input and the output to the managed executable.
*/
private static class StreamDataCollector extends DataCollector {
- PigExecutableManager executable; //Executable manager
+ ExecutableManager executableManager; //Executable manager
- public StreamDataCollector(String streamingCommand,
- LoadFunc deserializer, StoreFunc serializer,
+ public StreamDataCollector(Properties properties,
+ ExecutableManager executableManager,
+ StreamingCommand command,
DataCollector endOfPipe) {
super(endOfPipe);
+ this.executableManager = executableManager;
DataCollector successor =
new DataCollector(endOfPipe) {
@@ -102,33 +114,32 @@
};
try {
- // Create the PigExecutableManager
- executable = new PigExecutableManager(streamingCommand,
- deserializer, serializer,
- successor);
-
- executable.configure();
+ // Setup the ExecutableManager
+ this.executableManager.configure(properties, command, successor);
// Start the executable
- executable.run();
+ this.executableManager.run();
} catch (Exception e) {
- LOG.fatal("Failed to create/start PigExecutableManager with: " + e);
+ LOG.fatal("Failed to create/start PigExecutableManager with: " +
+ e);
+ e.printStackTrace();
throw new RuntimeException(e);
}
}
public void add(Datum d) {
try {
- executable.add(d);
+ executableManager.add(d);
} catch (IOException ioe) {
- LOG.fatal("executable.add(" + d + ") failed with: " + ioe);
+ LOG.fatal("ExecutableManager.add(" + d + ") failed with: " +
+ ioe);
throw new RuntimeException(ioe);
}
}
protected void finish() {
try {
- executable.close();
+ executableManager.close();
}
catch (Exception e) {
LOG.fatal("Failed to close PigExecutableManager with: " + e);
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TimeWindowSpec.java Thu Mar 27 09:54:10 2008
@@ -20,6 +20,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import org.apache.pig.data.Datum;
import org.apache.pig.data.TimestampedTuple;
@@ -40,7 +41,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe) {
@Override
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/window/TupleWindowSpec.java Thu Mar 27 09:54:10 2008
@@ -37,7 +37,8 @@
}
@Override
- protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ protected DataCollector setupDefaultPipe(Properties properties,
+ DataCollector endOfPipe) {
return new DataCollector(endOfPipe) {
@Override
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Mar 27 09:54:10 2008
@@ -44,6 +44,8 @@
import org.apache.pig.builtin.*;
import org.apache.pig.impl.builtin.*;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
public class QueryParser {
private PigContext pigContext;
@@ -241,9 +243,121 @@
void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, Cond cond, int index){
splitOp.addCond(cond);
LOSplitOutput splitOut = new LOSplitOutput(opTable, scope, getNextId(), lp.getRoot(), index);
- aliases.put(alias, new LogicalPlan(splitOut.getOperatorKey(), opTable, pigContext));
+ aliases.put(alias, new LogicalPlan(splitOut.getOperatorKey(), opTable, pigContext));
}
+ // Check and set files to be automatically shipped for the given StreamingCommand
+ // Auto-shipping rules:
+ // 1. If the command begins with either perl or python assume that the
+ // binary is the first non-quoted string it encounters that does not
+ // start with dash - subject to restrictions in (2).
+ // 2. Otherwise, attempt to ship the first string from the command line as
+ // long as it does not come from /bin, /user/bin, /user/local/bin.
+ // It will determine that by scanning the path if an absolute path is
+ // provided or by executing "which". The paths can be made configurable
+ // via "set stream.skippath <paths>" option.
+ private static final String PERL = "perl";
+ private static final String PYTHON = "python";
+ private void checkAutoShipSpecs(StreamingCommand command, String[] argv) {
+ String arg0 = argv[0];
+ // Check if command is perl or python ...
+ if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
+ for (int i=1; i < argv.length; ++i) {
+ if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
+ command.addPathToShip(argv[i]);
+ command.setExecutable(argv[i]);
+ break;
+ }
+ }
+ } else {
+ // Ship the first argument if it can be ...
+ boolean absPath = arg0.startsWith("/");
+ String filePath = (absPath) ? arg0 : which(arg0);
+ if (filePath.length() > 0 && checkAndShip(command, filePath)) {
+ // Make it relative to task's cwd
+ String runtimeExecutablePath = (absPath) ? ("." + filePath) : filePath;
+ argv[0] = runtimeExecutablePath;
+ command.setCommandArgs(argv);
+ command.setExecutable(runtimeExecutablePath);
+
+ // Ship the file
+ command.addPathToShip(filePath);
+ }
+ }
+ }
+
+ private boolean isQuotedString(String s) {
+ return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
+ }
+
+ private boolean checkAndShip(StreamingCommand command, String file) {
+ // Check if file is in the paths to be skipped
+ for (String skipPath : pigContext.getPathsToSkip()) {
+ if (file.startsWith(skipPath)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+
+ private static String which(String file) {
+ try {
+ ProcessBuilder processBuilder = new ProcessBuilder(new String[] {"which", file});
+ Process process = processBuilder.start();
+
+ BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String fullPath = stdout.readLine();
+
+ return (process.waitFor() == 0) ? fullPath : "";
+ } catch (Exception e) {}
+ return "";
+ }
+
+ private static final char SINGLE_QUOTE = '\'';
+ private static final char DOUBLE_QUOTE = '"';
+ private static String[] splitArgs(String command) throws ParseException {
+ List<String> argv = new ArrayList<String>();
+
+ int beginIndex = 0;
+
+ while (beginIndex < command.length()) {
+ // Skip spaces
+ while (Character.isWhitespace(command.charAt(beginIndex))) {
+ ++beginIndex;
+ }
+
+ char delim = ' ';
+ char charAtIndex = command.charAt(beginIndex);
+ if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+ delim = charAtIndex;
+ }
+
+ int endIndex = command.indexOf(delim, beginIndex+1);
+ if (endIndex == -1) {
+ if (Character.isWhitespace(delim)) {
+ // Reached end of command-line
+ argv.add(command.substring(beginIndex));
+ break;
+ } else {
+ // Didn't find the ending quote/double-quote
+ throw new ParseException("Illegal command: " + command);
+ }
+ }
+
+ if (Character.isWhitespace(delim)) {
+ // Do not consume the space
+ argv.add(command.substring(beginIndex, endIndex));
+ } else {
+ argv.add(command.substring(beginIndex, endIndex+1));
+ }
+
+ beginIndex = endIndex + 1;
+ }
+
+ return argv.toArray(new String[argv.size()]);
+ }
+
}
@@ -387,8 +501,15 @@
TOKEN : { <EVAL : "eval"> }
TOKEN : { <STREAM : "stream"> }
TOKEN : { <THROUGH : "through"> }
-TOKEN : { <BACKTICK : "`"> }
TOKEN : { <STORE : "store"> }
+TOKEN : { <SHIP: "ship"> }
+TOKEN : { <CACHE: "cache"> }
+TOKEN : { <INPUT: "input"> }
+TOKEN : { <OUTPUT: "output"> }
+TOKEN : { <ERROR: "stderr"> }
+TOKEN : { <STDIN: "stdin"> }
+TOKEN : { <STDOUT: "stdout"> }
+TOKEN : { <LIMIT: "limit"> }
TOKEN:
{
@@ -1254,43 +1375,104 @@
}
}
-LogicalOperator StreamClause(): {LogicalOperator input; String streamingCommand;}
+LogicalOperator StreamClause():
+{
+ LogicalOperator input;
+ StreamingCommand command;
+}
{
input = NestedExpr()
- <THROUGH> streamingCommand = StreamingCommand()
+ <THROUGH> command = Command()
{
- return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), new StreamSpec(streamingCommand));
+ return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(),
+ new StreamSpec(pigContext.createExecutableManager(),
+ command)
+ );
}
}
-String StreamingCommand(): {Token t;}
+StreamingCommand Command(): {Token t; StreamingCommand command;}
{
t = <EXECCOMMAND>
{
- return unquote(t.image);
+ String[] argv = splitArgs(unquote(t.image));
+ command = new StreamingCommand(argv);
+ checkAutoShipSpecs(command, argv);
+ return command;
+ }
+ |
+ t = <IDENTIFIER>
+ {
+ command = pigContext.getCommandForAlias(t.image);
+ if (command == null) {
+ throw new ParseException("Undefined command-alias: " + t.image +
+ " used as stream operator");
+ }
+
+ return command;
}
}
-LogicalOperator DefineClause() : {Token t; Token t1; String functionName, functionArgs;}
+LogicalOperator DefineClause() : {Token t; Token cmd; String functionName, functionArgs;}
{
- t = <IDENTIFIER>
- (
+ t = <IDENTIFIER>
+ (
+ (
+ cmd = <EXECCOMMAND>
+ {
+ StreamingCommand command =
+ new StreamingCommand(splitArgs(unquote(cmd.image)));
+ String[] paths;
+ StreamingCommand.HandleSpec[] handleSpecs;
+ }
+ (
+ <SHIP> "(" paths = PathList() ")"
+ {
+ if (paths.length == 0) {
+ command.setShipFiles(false);
+ } else {
+ for (String path : paths) {
+ command.addPathToShip(path);
+ }
+ }
+ }
+ |
+ <CACHE> "(" paths = PathList() ")"
+ {
+ for (String path : paths) {
+ command.addPathToCache(path);
+ }
+ }
+ |
+ <INPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.INPUT) ")"
+ |
+ <OUTPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.OUTPUT) ")"
+ |
+ <ERROR> "(" ErrorSpec(command, t.image) ")"
+ )*
+ {
+ pigContext.registerStreamCmd(t.image, command);
+ }
+ )
+ |
+ (
functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
{
- pigContext.registerFunction(t.image,
- (functionName + "(" + functionArgs + ")"));
+ pigContext.registerFunction(t.image,
+ (functionName + "(" + functionArgs + ")"));
}
)
+ )
{
- // Return the dummy LODefine
- return new LODefine(opTable, scope, getNextId());
+ // Return the dummy LODefine
+ return new LODefine(opTable, scope, getNextId());
}
}
LogicalOperator StoreClause() : {LogicalOperator lo; Token t; String fileName; String functionSpec = null; String functionName, functionArgs;}
{
- t = <IDENTIFIER> <INTO> fileName = FileName()
+ t = <IDENTIFIER> <INTO> fileName = FileName()
(
<USING> functionName = QualifiedFunction()
{functionSpec = functionName;}
@@ -1305,12 +1487,79 @@
}
LogicalPlan readFrom = aliases.get(t.image);
-
+ String jobOutputFile = massageFilename(fileName, pigContext, false);
lo = new LOStore(opTable, scope, getNextId(), readFrom.getRoot(),
- new FileSpec(massageFilename(fileName, pigContext, false),
- functionSpec),
+ new FileSpec(jobOutputFile, functionSpec),
false);
-
+ pigContext.setJobOutputFile(jobOutputFile);
+
return lo;
}
+}
+
+String[] PathList() : {Token t; List<String> pathList = new ArrayList<String>();}
+{
+ (
+ (
+ t = <QUOTEDSTRING> {pathList.add(unquote(t.image));}
+ ( "," t = <QUOTEDSTRING> {pathList.add(unquote(t.image));} )*
+ )
+ | {}
+ )
+ {return pathList.toArray(new String[pathList.size()]);}
+}
+
+void InputOutputSpec(StreamingCommand command, StreamingCommand.Handle handle):
+{
+ String stream, deserializer;
+ StreamingCommand.HandleSpec[] handleSpecs;
+ String functionName, functionArgs;
+}
+{
+
+ stream = CommandStream() <USING> functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
+ {
+ deserializer = functionName + "(" + functionArgs + ")";
+ command.addHandleSpec(handle,
+ new HandleSpec(stream, deserializer)
+ );
+ }
+ (
+ ","
+ stream = CommandStream() <USING> functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
+ {
+ deserializer = functionName + "(" + functionArgs + ")";
+ command.addHandleSpec(handle,
+ new HandleSpec(stream, deserializer)
+ );
+ }
+ )*
+}
+
+String CommandStream(): {Token t;}
+{
+ t = <STDIN>
+ {return "stdin";}
+ |
+ t = <STDOUT>
+ {return "stdout";}
+ |
+ t = <QUOTEDSTRING>
+ {return unquote(t.image);}
+}
+
+void ErrorSpec(StreamingCommand command, String alias): {Token t1, t2; int limit = StreamingCommand.MAX_TASKS;}
+{
+ (
+ t1 = <QUOTEDSTRING>
+ (<LIMIT> t2 = <NUMBER> {limit = Integer.parseInt(t2.image);})?
+ {
+ command.setLogDir(unquote(t1.image));
+ command.setLogFilesLimit(limit);
+ }
+ )
+ |
+ {
+ command.setLogDir(alias);
+ }
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java Thu Mar 27 09:54:10 2008
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.streaming;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.Datum;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.eval.collector.DataCollector;
-import org.apache.pig.impl.io.BufferedPositionedInputStream;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-
-/**
- * {@link PigExecutableManager} manages an external executable which processes data
- * in a Pig query.
- *
- * The <code>PigExecutableManager</code> is responsible for startup/teardown of the
- * external process and also for managing it.
- * It feeds input records to the executable via it's <code>stdin</code>,
- * collects the output records from the <code>stdout</code> and also diagnostic
- * information from the <code>stdout</code>.
- */
-public class PigExecutableManager {
- private static final Log LOG =
- LogFactory.getLog(PigExecutableManager.class.getName());
-
- String command; // Streaming command to be run
- String[] argv; // Parsed/split commands
-
- Process process; // Handle to the process
- private static int SUCCESS = 0;
-
- protected DataOutputStream stdin; // stdin of the process
- StoreFunc serializer; // serializer to be used to
- // send data to the process
-
- ProcessOutputThread stdoutThread; // thread to get process output
- InputStream stdout; // stdout of the process
- LoadFunc deserializer; // deserializer to be used to
- // interpret the process' output
-
- ProcessErrorThread stderrThread; // thread to get process output
- InputStream stderr; // stderr of the process
-
- DataCollector endOfPipe;
-
- public PigExecutableManager(String command,
- LoadFunc deserializer, StoreFunc serializer,
- DataCollector endOfPipe) throws Exception {
- this.command = command;
-
- this.argv = splitArgs(this.command);
- if (LOG.isDebugEnabled()) {
- for (String cmd : argv) {
- LOG.debug("argv: " + cmd);
- }
- }
-
- this.deserializer = deserializer;
- this.serializer = serializer;
- this.endOfPipe = endOfPipe;
- }
-
- private static final char SINGLE_QUOTE = '\'';
- private static final char DOUBLE_QUOTE = '"';
- private static String[] splitArgs(String command) throws Exception {
- List<String> argv = new ArrayList<String>();
-
- int beginIndex = 0;
-
- while (beginIndex < command.length()) {
- // Skip spaces
- while (Character.isWhitespace(command.charAt(beginIndex))) {
- ++beginIndex;
- }
-
- char delim = ' ';
- char charAtIndex = command.charAt(beginIndex);
- if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
- delim = charAtIndex;
- }
-
- int endIndex = command.indexOf(delim, beginIndex+1);
- if (endIndex == -1) {
- if (Character.isWhitespace(delim)) {
- // Reached end of command-line
- argv.add(command.substring(beginIndex));
- break;
- } else {
- // Didn't find the ending quote/double-quote
- throw new ParseException("Illegal command: " + command);
- }
- }
-
- if (Character.isWhitespace(delim)) {
- // Do not consume the space
- argv.add(command.substring(beginIndex, endIndex));
- } else {
- // Do not consume the quotes
- argv.add(command.substring(beginIndex+1, endIndex));
- }
-
- beginIndex = endIndex + 1;
- }
-
- return argv.toArray(new String[0]);
- }
-
- public void configure() {
- }
-
- public void close() throws Exception {
- // Close the stdin to let the process terminate
- stdin.flush();
- stdin.close();
- stdin = null;
-
- // Wait for the process to exit and the stdout/stderr threads to complete
- int exitCode = -1;
- try {
- exitCode = process.waitFor();
-
- if (stdoutThread != null) {
- stdoutThread.join(0);
- }
- if (stderrThread != null) {
- stderrThread.join(0);
- }
-
- } catch (InterruptedException ie) {}
-
- // Clean up the process
- process.destroy();
-
- LOG.debug("Process exited with: " + exitCode);
- if (exitCode != SUCCESS) {
- throw new ExecException(command + " failed with exit status: " +
- exitCode);
- }
- }
-
- public void run() throws IOException {
- // Run the executable
- ProcessBuilder processBuilder = new ProcessBuilder(argv);
- process = processBuilder.start();
- LOG.debug("Started the process for command: " + command);
-
- // Pick up the process' stdin/stdout/stderr streams
- stdin =
- new DataOutputStream(new BufferedOutputStream(process.getOutputStream()));
- stdout =
- new DataInputStream(new BufferedInputStream(process.getInputStream()));
- stderr =
- new DataInputStream(new BufferedInputStream(process.getErrorStream()));
-
- // Attach the serializer to the stdin of the process for sending tuples
- serializer.bindTo(stdin);
-
- // Attach the deserializer to the stdout of the process to get tuples
- deserializer.bindTo("", new BufferedPositionedInputStream(stdout), 0,
- Long.MAX_VALUE);
-
- // Start the threads to process the executable's stdout and stderr
- stdoutThread = new ProcessOutputThread(deserializer);
- stdoutThread.start();
- stderrThread = new ProcessErrorThread();
- stderrThread.start();
- }
-
- public void add(Datum d) throws IOException {
- // Pass the serialized tuple to the executable
- serializer.putNext((Tuple)d);
- stdin.flush();
- }
-
- /**
- * Workhorse to process the stdout of the managed process.
- *
- * The <code>PigExecutableManager</code>, by default, just pushes the received
- * <code>Datum</code> into eval-pipeline to be processed by the successor.
- *
- * @param d <code>Datum</code> to process
- */
- protected void processOutput(Datum d) {
- endOfPipe.add(d);
- }
-
- class ProcessOutputThread extends Thread {
-
- LoadFunc deserializer;
-
- ProcessOutputThread(LoadFunc deserializer) {
- setDaemon(true);
- this.deserializer = deserializer;
- }
-
- public void run() {
- try {
- // Read tuples from the executable and push them down the pipe
- Tuple tuple = null;
- while ((tuple = deserializer.getNext()) != null) {
- processOutput(tuple);
- }
-
- if (stdout != null) {
- stdout.close();
- LOG.debug("ProcessOutputThread done");
- }
- } catch (Throwable th) {
- LOG.warn(th);
- try {
- if (stdout != null) {
- stdout.close();
- }
- } catch (IOException ioe) {
- LOG.info(ioe);
- }
- throw new RuntimeException(th);
- }
- }
- }
-
- /**
- * Workhorse to process the stderr stream of the managed process.
- *
- * By default <code>PigExecuatbleManager</code> just sends out the received
- * error message to the <code>stderr</code> of itself.
- *
- * @param error error message from the managed process.
- */
- protected void processError(String error) {
- // Just send it out to our stderr
- System.err.println(error);
- }
-
- class ProcessErrorThread extends Thread {
-
- public ProcessErrorThread() {
- setDaemon(true);
- }
-
- public void run() {
- try {
- String error;
- BufferedReader reader =
- new BufferedReader(new InputStreamReader(stderr));
- while ((error = reader.readLine()) != null) {
- processError(error);
- }
-
- if (stderr != null) {
- stderr.close();
- LOG.debug("ProcessErrorThread done");
- }
- } catch (Throwable th) {
- LOG.warn(th);
- try {
- if (stderr != null) {
- stderr.close();
- }
- } catch (IOException ioe) {
- LOG.info(ioe);
- throw new RuntimeException(th);
- }
- }
- }
- }
-}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/JarManager.java Thu Mar 27 09:54:10 2008
@@ -90,11 +90,15 @@
* @param funcs
* the functions that will be used in a job and whose jar files need to be included
* in the final merged jar file.
+ * @param files files which will be used in the job and need to be shipped
* @return the temporary path to the merged jar file.
* @throws ClassNotFoundException
* @throws IOException
*/
- public static void createJar(OutputStream os, List<String> funcs, PigContext pigContext) throws ClassNotFoundException, IOException {
+ public static void createJar(OutputStream os,
+ List<String> funcs, List<String> files,
+ PigContext pigContext)
+ throws ClassNotFoundException, IOException {
Vector<JarListEntry> jarList = new Vector<JarListEntry>();
for(String toSend: pigPackagesToSend) {
addContainingJar(jarList, PigMapReduce.class, toSend, pigContext);
@@ -126,6 +130,11 @@
jarFile.putNextEntry(new ZipEntry("pigContext"));
new ObjectOutputStream(jarFile).writeObject(pigContext);
}
+
+ for (String file : files) {
+ addStream(jarFile, file, new FileInputStream(file), contents);
+ }
+
jarFile.close();
}
Modified: incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Thu Mar 27 09:54:10 2008
@@ -157,6 +157,9 @@
//mPigServer.setJobName(unquote(value));
mPigServer.setJobName(value);
}
+ else if (key.equals("stream.skippath")) {
+ mPigServer.addPathToSkip(value);
+ }
else
{
// other key-value pairs can go there
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=641888&r1=641887&r2=641888&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu Mar 27 09:54:10 2008
@@ -18,6 +18,9 @@
package org.apache.pig.test;
import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
import org.junit.Assert;
import org.junit.Test;
@@ -25,6 +28,9 @@
import org.apache.pig.PigServer;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.*;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+
import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
import junit.framework.TestCase;
@@ -36,7 +42,7 @@
private static final String simpleEchoStreamingCommand =
"perl -ne 'chomp $_; print \"$_\n\"'";
- private Tuple[] setupExpectedResults(String[] firstField, int[] secondField) {
+ private Tuple[] setupExpectedResults(String[] firstField, int[] secondField) {
Assert.assertEquals(firstField.length, secondField.length);
Tuple[] expectedResults = new Tuple[firstField.length];
@@ -65,14 +71,14 @@
setupExpectedResults(expectedFirstFields, expectedSecondFields);
// Pig query to run
- pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
PigStorage.class.getName() + "(',');");
- pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
- pigServer.registerQuery("OUTPUT = stream FILTERED_DATA through `" +
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
simpleEchoStreamingCommand + "`;");
// Run the query and check the results
- Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
@Test
@@ -91,15 +97,15 @@
setupExpectedResults(expectedFirstFields, expectedSecondFields);
// Pig query to run
- pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
PigStorage.class.getName() + "(',');");
- pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
simpleEchoStreamingCommand + "` as (f0, f1);");
- pigServer.registerQuery("OUTPUT = filter STREAMED_DATA by f1 > '6';");
+ pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > '6';");
// Run the query and check the results
- Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
@Test
@@ -118,17 +124,17 @@
setupExpectedResults(expectedFirstFields, expectedSecondFields);
// Pig query to run
- pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
PigStorage.class.getName() + "(',');");
- pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
"generate flatten($1);");
- pigServer.registerQuery("OUTPUT = stream FLATTENED_GROUPED_DATA through `" +
+ pigServer.registerQuery("OP = stream FLATTENED_GROUPED_DATA through `" +
simpleEchoStreamingCommand + "`;");
// Run the query and check the results
- Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
@Test
@@ -160,19 +166,206 @@
setupExpectedResults(expectedFirstFields, expectedSecondFields);
// Pig query to run
- pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
PigStorage.class.getName() + "(',');");
- pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
- pigServer.registerQuery("GROUPED_DATA = group INPUT by $0;");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("GROUPED_DATA = group IP by $0;");
pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
- " D = order INPUT BY $2, $3;" +
+ " D = order IP BY $2, $3;" +
" generate flatten(D);" +
"};");
- pigServer.registerQuery("OUTPUT = stream ORDERED_DATA through `" +
+ pigServer.registerQuery("OP = stream ORDERED_DATA through `" +
simpleEchoStreamingCommand + "`;");
// Run the query and check the results
- Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
+
+ @Test
+ public void testInputShipSpecs() throws Exception {
+ PigServer pigServer = new PigServer(MAPREDUCE);
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ " print STDOUT \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ "}",
+ };
+ File command = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "A", "D", "A"};
+ int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD `." + command + " foo` " +
+ "ship ('" + command + "') " +
+ "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
+ String output = "/pig/out";
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+
+ // Cleanup
+ pigServer.deleteFile(output);
+ }
+
+ @Test
+ public void testOutputShipSpecs() throws Exception {
+ PigServer pigServer = new PigServer(MAPREDUCE);
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+ "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+ "while (<STDIN>) {",
+ " print OUTFILE \"A,10\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ " print OUTFILE2 \"Secondary Output: $_\n\";",
+ "}",
+ };
+ File command = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "A", "A", "A", "A", "A"};
+ int[] expectedSecondFields = new int[] {10, 10, 10, 10, 10, 10};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD `." + command + " foo bar` " +
+ "ship ('" + command + "') " +
+ "output('foo' using " + PigStorage.class.getName() + "(','), " +
+ "'bar' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
+ String output = "/pig/out";
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ // Cleanup
+ pigServer.deleteFile(output);
+ }
+
+ @Test
+ public void testInputOutputSpecsWithAutoShip() throws Exception {
+ PigServer pigServer = new PigServer(MAPREDUCE);
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "open(OUTFILE, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+ "open(OUTFILE2, \">\", $ARGV[2]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ " print OUTFILE \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ " print OUTFILE2 \"Secondary Output: $_\n\";",
+ "}",
+ };
+ File command = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "A", "D", "A"};
+ int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD `." + command + " foo bar foobar` " +
+ "ship ('" + command + "') " +
+ "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+ "output('bar' using " + PigStorage.class.getName() + "(','), " +
+ "'foobar' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
+ String output = "/pig/out";
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+
+ // Cleanup
+ pigServer.deleteFile(output);
+ }
}