You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/04 23:25:43 UTC
svn commit: r692253 [1/3] - in /incubator/pig/branches/types: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/ex...
Author: olga
Date: Thu Sep 4 14:25:41 2008
New Revision: 692253
URL: http://svn.apache.org/viewvc?rev=692253&view=rev
Log:
merged streaming into types branch
Added:
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java
incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
incubator/pig/branches/types/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeCheckingValidatorNoSchema.java
incubator/pig/branches/types/test/org/apache/pig/test/Util.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TypeCheckingTestUtil.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/dotGraph/LogicalPlanLoader.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu Sep 4 14:25:41 2008
@@ -344,6 +344,7 @@
<exclude name="**/TestStoreOld.java" />
-->
<!-- Excluced because we don't want to run them -->
+ <exclude name="**/PigExecTestCase.java" />
<exclude name="**/TypeCheckingTestUtil.java" />
<exclude name="**/TypeGraphPrinter.java" />
<exclude name="**/LogicalPlanTester.java" />
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu Sep 4 14:25:41 2008
@@ -65,6 +65,7 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.SplitIntroducer;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.WrappedIOException;
import org.apache.pig.impl.util.PropertiesUtil;
import org.apache.pig.impl.logicalLayer.LODefine;
@@ -145,6 +146,16 @@
}
/**
+ * Add a path to be skipped while automatically shipping binaries for
+ * streaming.
+ *
+ * @param path path to be skipped
+ */
+ public void addPathToSkip(String path) {
+ pigContext.addPathToSkip(path);
+ }
+
+ /**
* Defines an alias for the given function spec. This
* is useful for functions that require arguments to the
* constructor.
@@ -171,6 +182,16 @@
pigContext.registerFunction(function, funcSpec);
}
+ /**
+ * Defines an alias for the given streaming command.
+ *
+ * @param commandAlias - the new command alias to define
+ * @param command - streaming command to be executed
+ */
+ public void registerStreamingCommand(String commandAlias, StreamingCommand command) {
+ pigContext.registerStreamCmd(commandAlias, command);
+ }
+
private URL locateJarFromResources(String jarName) throws IOException {
Enumeration<URL> urls = ClassLoader.getSystemResources(jarName);
URL resourceLocation = null;
@@ -395,7 +416,7 @@
stream.println("-----------------------------------------------");
pigContext.getExecutionEngine().explain(pp, stream);
-
+
} catch (Exception e) {
throw WrappedIOException.wrap("Unable to explain alias " +
alias, e);
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Thu Sep 4 14:25:41 2008
@@ -210,6 +210,17 @@
POPostCombinerPackage newPack =
new POPostCombinerPackage(pack, bags);
mr.reducePlan.replace(pack, newPack);
+
+ // the replace() above only changes
+ // the plan and does not change "inputs" to
+ // operators
+ // set up "inputs" for the operator after
+ // package correctly
+ List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
+ packList.add(newPack);
+ List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newPack);
+ // there should be only one successor to package
+ sucs.get(0).setInputs(packList);
} catch (Exception e) {
throw new VisitorException(e);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Thu Sep 4 14:25:41 2008
@@ -20,15 +20,21 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Hashtable;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
@@ -59,10 +65,12 @@
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
/**
* This is compiler class that takes an MROperPlan and converts
@@ -79,6 +87,8 @@
private final Log log = LogFactory.getLog(getClass());
+ public static final String LOG_DIR = "_logs";
+
/**
* The map between MapReduceOpers and their corresponding Jobs
*/
@@ -188,7 +198,7 @@
*/
private JobConf getJobConf(MapReduceOper mro, Configuration conf, PigContext pigContext) throws JobCreationException{
JobConf jobConf = new JobConf(conf);
- ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
+ ArrayList<Pair<FileSpec, Boolean>> inp = new ArrayList<Pair<FileSpec, Boolean>>();
ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
//Set the User Name for this job. This will be
@@ -201,8 +211,11 @@
if(lds!=null && lds.size()>0){
for (PhysicalOperator operator : lds) {
POLoad ld = (POLoad)operator;
+
+ Pair<FileSpec, Boolean> p = new Pair<FileSpec, Boolean>(ld.getLFile(), ld.isSplittable());
//Store the inp filespecs
- inp.add(ld.getLFile());
+ inp.add(p);
+
//Store the target operators for tuples read
//from this input
List<PhysicalOperator> ldSucs = mro.mapPlan.getSuccessors(ld);
@@ -229,6 +242,12 @@
jobConf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
jobConf.set("pig.pigContext", ObjectSerializer.serialize(pigContext));
+ // Setup the DistributedCache for this job
+ setupDistributedCache(pigContext, jobConf, pigContext.getProperties(),
+ "pig.streaming.ship.files", true);
+ setupDistributedCache(pigContext, jobConf, pigContext.getProperties(),
+ "pig.streaming.cache.files", false);
+
jobConf.setInputFormat(PigInputFormat.class);
jobConf.setOutputFormat(PigOutputFormat.class);
@@ -247,17 +266,39 @@
FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
jobConf.set("pig.storeFunc", outputFuncSpec.toString());
+
+ // Setup the logs directory for streaming jobs
+ jobConf.set("pig.streaming.log.dir",
+ new Path(new Path(outputPath), LOG_DIR).toString());
+ jobConf.set("pig.streaming.task.output.dir", outputPath);
+
// store map key type
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
jobConf.set("pig.map.keytype", ObjectSerializer.serialize(new byte[] { mro.mapKeyType }));
+
+ // set parent plan in all operators in map and reduce plans
+ // currently the parent plan is really used only when POStream is present in the plan
+ PhysicalPlan[] plans = new PhysicalPlan[] { mro.mapPlan, mro.reducePlan };
+ for (int i = 0; i < plans.length; i++) {
+ for (Iterator<PhysicalOperator> it = plans[i].iterator(); it.hasNext();) {
+ PhysicalOperator op = it.next();
+ op.setParentPlan(plans[i]);
+ }
+ }
if(mro.reducePlan.isEmpty()){
//MapOnly Job
jobConf.setMapperClass(PigMapOnly.Map.class);
jobConf.setNumReduceTasks(0);
jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+ if(mro.isStreamInMap()) {
+ // this is used in Map.close() to decide whether the
+ // pipeline needs to be rerun one more time in the close()
+ // The pipeline is rerun only if there was a stream
+ jobConf.set("pig.stream.in.map", "true");
+ }
}
else{
//Map Reduce Job
@@ -277,14 +318,26 @@
jobConf.setNumReduceTasks(mro.requestedParallelism);
jobConf.set("pig.mapPlan", ObjectSerializer.serialize(mro.mapPlan));
+ if(mro.isStreamInMap()) {
+ // this is used in Map.close() to decide whether the
+ // pipeline needs to be rerun one more time in the close()
+ // The pipeline is rerun only if there was a stream
+ jobConf.set("pig.stream.in.map", "true");
+ }
jobConf.set("pig.reducePlan", ObjectSerializer.serialize(mro.reducePlan));
+ if(mro.isStreamInReduce()) {
+ // this is used in Map.close() to decide whether the
+ // pipeline needs to be rerun one more time in the close()
+ // The pipeline is rerun only if there was a stream
+ jobConf.set("pig.stream.in.reduce", "true");
+ }
jobConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(pack.getKeyType()).getClass();
jobConf.setOutputKeyClass(keyClass);
selectComparator(mro, pack.getKeyType(), jobConf);
jobConf.setOutputValueClass(IndexedTuple.class);
}
-
+
if(mro.isGlobalSort()){
jobConf.set("pig.quantilesFile", mro.getQuantFile());
jobConf.setPartitionerClass(SortPartitioner.class);
@@ -299,7 +352,6 @@
ObjectSerializer.serialize(mro.getSortOrder()));
}
}
-
return jobConf;
}catch(Exception e){
JobCreationException jce = new JobCreationException(e);
@@ -474,4 +526,56 @@
}
}
+ private static void setupDistributedCache(PigContext pigContext,
+ Configuration conf,
+ Properties properties, String key,
+ boolean shipToCluster)
+ throws IOException {
+ // Turn on the symlink feature
+ DistributedCache.createSymlink(conf);
+
+ // Set up the DistributedCache for this job
+ String fileNames = properties.getProperty(key);
+ if (fileNames != null) {
+ String[] paths = fileNames.split(",");
+
+ for (String path : paths) {
+ path = path.trim();
+ if (path.length() != 0) {
+ Path src = new Path(path);
+
+ // Ensure that 'src' is a valid URI
+ URI srcURI = null;
+ try {
+ srcURI = new URI(src.toString());
+ } catch (URISyntaxException ue) {
+ throw new IOException("Invalid cache specification, " +
+ "file doesn't exist: " + src);
+ }
+
+ // Ship it to the cluster if necessary and add to the
+ // DistributedCache
+ if (shipToCluster) {
+ Path dst =
+ new Path(FileLocalizer.getTemporaryPath(null, pigContext).toString());
+ FileSystem fs = dst.getFileSystem(conf);
+ fs.copyFromLocalFile(src, dst);
+
+ // Construct the dst#srcName uri for DistributedCache
+ URI dstURI = null;
+ try {
+ dstURI = new URI(dst.toString() + "#" + src.getName());
+ } catch (URISyntaxException ue) {
+ throw new IOException("Invalid ship specification, " +
+ "file doesn't exist: " + dst);
+ }
+ DistributedCache.addCacheFile(dstURI, conf);
+ } else {
+ DistributedCache.addCacheFile(srcURI, conf);
+ }
+ }
+ }
+ }
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Sep 4 14:25:41 2008
@@ -58,6 +58,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -268,7 +269,7 @@
}
private POLoad getLoad(){
- POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
+ POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)), true);
ld.setPc(pigContext);
return ld;
}
@@ -301,6 +302,7 @@
* @throws IOException
*/
private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
+
if (compiledInputs.length == 1) {
//For speed
MapReduceOper mro = compiledInputs[0];
@@ -622,6 +624,16 @@
}
}
+ public void visitStream(POStream op) throws VisitorException{
+ try{
+ nonBlocking(op);
+ }catch(Exception e){
+ VisitorException pe = new VisitorException(e.getMessage());
+ pe.initCause(e);
+ throw pe;
+ }
+ }
+
public void visitLimit(POLimit op) throws VisitorException{
try{
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Sep 4 14:25:41 2008
@@ -35,6 +35,7 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRPrinter;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MRStreamHandler;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.PlanException;
@@ -128,11 +129,16 @@
CombinerOptimizer co = new CombinerOptimizer(plan);
co.visit();
}
+
+ // check whether stream operator is present
+ MRStreamHandler checker = new MRStreamHandler(plan);
+ checker.visit();
+
// figure out the type of the key for the map plan
// this is needed when the key is null to create
// an appropriate NullableXXXWritable object
KeyTypeDiscoveryVisitor kdv = new KeyTypeDiscoveryVisitor(plan);
- kdv.visit();
+ kdv.visit();
return plan;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Sep 4 14:25:41 2008
@@ -66,6 +66,14 @@
//is complete
boolean reduceDone = false;
+ // Indicates that there is POStream in the
+ // map plan
+ boolean streamInMap = false;
+
+ // Indicates that there is POStream in the
+ // reduce plan
+ boolean streamInReduce = false;
+
//Indicates if this job is an order by job
boolean globalSort = false;
@@ -230,4 +238,32 @@
public boolean[] getSortOrder() {
return sortOrder;
}
+
+ /**
+ * @return whether there is a POStream in the map plan
+ */
+ public boolean isStreamInMap() {
+ return streamInMap;
+ }
+
+ /**
+ * @param streamInMap the streamInMap to set
+ */
+ public void setStreamInMap(boolean streamInMap) {
+ this.streamInMap = streamInMap;
+ }
+
+ /**
+ * @return whether there is a POStream in the reduce plan
+ */
+ public boolean isStreamInReduce() {
+ return streamInReduce;
+ }
+
+ /**
+ * @param streamInReduce the streamInReduce to set
+ */
+ public void setStreamInReduce(boolean streamInReduce) {
+ this.streamInReduce = streamInReduce;
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java Thu Sep 4 14:25:41 2008
@@ -52,6 +52,7 @@
import org.apache.pig.impl.io.ValidatingInputFileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.Pair;
public class PigInputFormat implements InputFormat<Text, TargetedTuple>,
JobConfigurable {
@@ -175,7 +176,7 @@
*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
- ArrayList<FileSpec> inputs = (ArrayList<FileSpec>) ObjectSerializer
+ ArrayList<Pair<FileSpec, Boolean>> inputs = (ArrayList<Pair<FileSpec, Boolean>>) ObjectSerializer
.deserialize(job.get("pig.inputs"));
ArrayList<ArrayList<OperatorKey>> inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
.deserialize(job.get("pig.inpTargets"));
@@ -184,7 +185,7 @@
ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
for (int i = 0; i < inputs.size(); i++) {
- Path path = new Path(inputs.get(i).getFileName());
+ Path path = new Path(inputs.get(i).first.getFileName());
FileSystem fs = path.getFileSystem(job);
// if the execution is against Mapred DFS, set
// working dir to /user/<userid>
@@ -193,10 +194,14 @@
DataStorage store = new HDataStorage(ConfigurationUtil.toProperties(job));
ValidatingInputFileSpec spec;
- if (inputs.get(i) instanceof ValidatingInputFileSpec) {
- spec = (ValidatingInputFileSpec) inputs.get(i);
+ if (inputs.get(i).first instanceof ValidatingInputFileSpec) {
+ spec = (ValidatingInputFileSpec) inputs.get(i).first;
} else {
- spec = new ValidatingInputFileSpec(inputs.get(i), store);
+ spec = new ValidatingInputFileSpec(inputs.get(i).first, store);
+ }
+ boolean isSplittable = inputs.get(i).second;
+ if (isSplittable && (spec.getSlicer() instanceof PigSlicer)) {
+ ((PigSlicer)spec.getSlicer()).setSplittable(isSplittable);
}
Slice[] pigs = spec.getSlicer().slice(store, spec.getFileName());
for (Slice split : pigs) {
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapBase.java Thu Sep 4 14:25:41 2008
@@ -17,29 +17,31 @@
import org.apache.pig.data.TargetedTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.SpillableMemoryManager;
-import org.apache.pig.impl.util.WrappedIOException;
public abstract class PigMapBase extends MapReduceBase{
private final Log log = LogFactory.getLog(getClass());
protected byte keyType;
+
//Map Plan
protected PhysicalPlan mp;
+ OutputCollector<WritableComparable, Writable> outputCollector;
+
// Reporter that will be used by operators
// to transmit heartbeat
ProgressableReporter pigReporter;
+
+ private boolean errorInMap = false;
+
/**
* Will be called when all the tuples in the input
@@ -49,7 +51,31 @@
public void close() throws IOException {
super.close();
PhysicalOperator.setReporter(null);
+ if(errorInMap) {
+ //error in map - returning
+ return;
+ }
+
+ if(PigMapReduce.sJobConf.get("pig.stream.in.map", "false").equals("true")) {
+ // If there is a stream in the pipeline we could
+ // potentially have more to process - so lets
+ // set the flag stating that all map input has been sent
+ // already and then lets run the pipeline one more time
+ // This will result in nothing happening in the case
+ // where there is no stream in the pipeline
+ mp.endOfAllInput = true;
+ List<PhysicalOperator> leaves = mp.getLeaves();
+ PhysicalOperator leaf = leaves.get(0);
+ try {
+ runPipeline(leaf);
+ } catch (ExecException e) {
+ IOException ioe = new IOException("Error running pipeline in close() of map");
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
mp = null;
+
}
/**
@@ -98,6 +124,9 @@
OutputCollector<WritableComparable, Writable> oc,
Reporter reporter) throws IOException {
+ // cache the collector for use in runPipeline() which
+ // can be called from close()
+ this.outputCollector = oc;
pigReporter.setRep(reporter);
PhysicalOperator.setReporter(pigReporter);
@@ -113,35 +142,19 @@
}
for (OperatorKey targetKey : inpTuple.targetOps) {
+
PhysicalOperator target = mp.getOperator(targetKey);
Tuple t = inpTuple.toTuple();
target.attachInput(t);
}
-
List<PhysicalOperator> leaves = mp.getLeaves();
PhysicalOperator leaf = leaves.get(0);
+
try {
- while(true){
- Result res = leaf.getNext(inpTuple);
- if(res.returnStatus==POStatus.STATUS_OK){
- collect(oc,(Tuple)res.result);
- continue;
- }
-
- if(res.returnStatus==POStatus.STATUS_EOP)
- return;
-
- if(res.returnStatus==POStatus.STATUS_NULL)
- continue;
-
- if(res.returnStatus==POStatus.STATUS_ERR){
- IOException ioe = new IOException("Received Error while " +
- "processing the map plan.");
- throw ioe;
- }
- }
+ runPipeline(leaf);
+
} catch (ExecException e) {
IOException ioe = new IOException(e.getMessage());
ioe.initCause(e.getCause());
@@ -149,6 +162,43 @@
}
}
+ private void runPipeline(PhysicalOperator leaf) throws IOException, ExecException {
+ Tuple dummyTuple = null;
+ while(true){
+ Result res = leaf.getNext(dummyTuple);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ collect(outputCollector,(Tuple)res.result);
+ continue;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_EOP) {
+ return;
+ }
+
+ if(res.returnStatus==POStatus.STATUS_NULL)
+ continue;
+
+ if(res.returnStatus==POStatus.STATUS_ERR){
+ // remember that we had an issue so that in
+ // close() we can do the right thing
+ errorInMap = true;
+ // if there is an errmessage use it
+ String errMsg;
+ if(res.result != null) {
+ errMsg = "Received Error while " +
+ "processing the map plan: " + res.result;
+ } else {
+ errMsg = "Received Error while " +
+ "processing the map plan.";
+ }
+
+ IOException ioe = new IOException(errMsg);
+ throw ioe;
+ }
+ }
+
+ }
+
abstract public void collect(OutputCollector<WritableComparable, Writable> oc, Tuple tuple) throws ExecException, IOException;
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigMapReduce.java Thu Sep 4 14:25:41 2008
@@ -100,6 +100,10 @@
private POPackage pack;
ProgressableReporter pigReporter;
+
+ private OutputCollector<WritableComparable, Writable> outputCollector;
+
+ private boolean errorInReduce = false;
/**
* Configures the Reduce plan, the POPackage operator
@@ -144,6 +148,9 @@
OutputCollector<WritableComparable, Writable> oc,
Reporter reporter) throws IOException {
+ // cache the collector for use in runPipeline()
+ // which could additionally be called from close()
+ this.outputCollector = oc;
pigReporter.setRep(reporter);
Object k = HDataType.convertToPigType(key);
@@ -160,31 +167,14 @@
return;
}
+ log.info("Attaching " + packRes + " to " + rp.getRoots());
rp.attachInput(packRes);
List<PhysicalOperator> leaves = rp.getLeaves();
PhysicalOperator leaf = leaves.get(0);
- while(true){
- Result redRes = leaf.getNext(t);
- if(redRes.returnStatus==POStatus.STATUS_OK){
- oc.collect(null, (Tuple)redRes.result);
- continue;
- }
-
- if(redRes.returnStatus==POStatus.STATUS_EOP) {
- return;
- }
-
- if(redRes.returnStatus==POStatus.STATUS_NULL)
- continue;
-
- if(redRes.returnStatus==POStatus.STATUS_ERR){
- IOException ioe = new IOException("Received Error while " +
- "processing the reduce plan.");
- throw ioe;
- }
- }
+ runPipeline(leaf);
+
}
if(res.returnStatus==POStatus.STATUS_NULL) {
@@ -204,6 +194,49 @@
}
}
+ /**
+ * @param leaf
+ * @throws ExecException
+ * @throws IOException
+ */
+ private void runPipeline(PhysicalOperator leaf) throws ExecException, IOException {
+ while(true)
+ {
+ Tuple dummyTuple = null;
+ Result redRes = leaf.getNext(dummyTuple);
+ if(redRes.returnStatus==POStatus.STATUS_OK){
+ outputCollector.collect(null, (Tuple)redRes.result);
+ continue;
+ }
+
+ if(redRes.returnStatus==POStatus.STATUS_EOP) {
+ return;
+ }
+
+ if(redRes.returnStatus==POStatus.STATUS_NULL)
+ continue;
+
+ if(redRes.returnStatus==POStatus.STATUS_ERR){
+ // remember that we had an issue so that in
+ // close() we can do the right thing
+ errorInReduce = true;
+ // if there is an errmessage use it
+ String errMsg;
+ if(redRes.result != null) {
+ errMsg = "Received Error while " +
+ "processing the reduce plan: " + redRes.result;
+ } else {
+ errMsg = "Received Error while " +
+ "processing the reduce plan.";
+ }
+
+ IOException ioe = new IOException(errMsg);
+ throw ioe;
+ }
+ }
+
+
+ }
/**
* Will be called once all the intermediate keys and values are
@@ -215,6 +248,30 @@
/*if(runnableReporter!=null)
runnableReporter.setDone(true);*/
PhysicalOperator.setReporter(null);
+
+ if(errorInReduce) {
+ // there was an error in reduce - just return
+ return;
+ }
+
+ if(PigMapReduce.sJobConf.get("pig.stream.in.reduce", "false").equals("true")) {
+ // If there is a stream in the pipeline we could
+ // potentially have more to process - so lets
+ // set the flag stating that all map input has been sent
+ // already and then lets run the pipeline one more time
+ // This will result in nothing happening in the case
+ // where there is no stream in the pipeline
+ rp.endOfAllInput = true;
+ List<PhysicalOperator> leaves = rp.getLeaves();
+ PhysicalOperator leaf = leaves.get(0);
+ try {
+ runPipeline(leaf);
+ } catch (ExecException e) {
+ IOException ioe = new IOException("Error running pipeline in close() of reduce");
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Sep 4 14:25:41 2008
@@ -728,6 +728,27 @@
}
@Override
+ public void visit(LOStream stream) throws VisitorException {
+ String scope = stream.getOperatorKey().scope;
+ POStream poStream = new POStream(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), stream.getExecutableManager(),
+ stream.getStreamingCommand(), this.pc.getProperties());
+ currentPlan.add(poStream);
+ LogToPhyMap.put(stream, poStream);
+
+ List<LogicalOperator> op = stream.getPlan().getPredecessors(stream);
+
+ PhysicalOperator from = LogToPhyMap.get(op.get(0));
+ try {
+ currentPlan.connect(from, poStream);
+ } catch (PlanException e) {
+ log.error("Invalid physical operators in the physical plan"
+ + e.getMessage());
+ throw new VisitorException(e);
+ }
+ }
+
+ @Override
public void visit(LOProject op) throws VisitorException {
String scope = op.getOperatorKey().scope;
POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
@@ -971,7 +992,7 @@
// This would be a root operator. We don't need to worry about finding
// its predecessors
POLoad load = new POLoad(new OperatorKey(scope, nodeGen
- .getNextNodeId(scope)));
+ .getNextNodeId(scope)), loLoad.isSplittable());
load.setLFile(loLoad.getInputFile());
load.setPc(pc);
load.setResultType(loLoad.getType());
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POStatus.java Thu Sep 4 14:25:41 2008
@@ -26,5 +26,9 @@
public static final byte STATUS_EOP = 3; // end of processing
+ // This is currently only used in communications
+ // between ExecutableManager and POStream
+ public static final byte STATUS_EOS = 4; // end of Streaming output (i.e. output from streaming binary)
+
public static Object result;
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Sep 4 14:25:41 2008
@@ -30,6 +30,7 @@
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -75,6 +76,9 @@
// The data type for the results of this operator
protected byte resultType = DataType.TUPLE;
+ // The physical plan this operator is part of
+ protected PhysicalPlan parentPlan;
+
// Specifies if the input has been directly attached
protected boolean inputAttached = false;
@@ -294,4 +298,12 @@
protected void cloneHelper(PhysicalOperator op) {
resultType = op.resultType;
}
+
+ /**
+ * @param physicalPlan
+ */
+ public void setParentPlan(PhysicalPlan physicalPlan) {
+ parentPlan = physicalPlan;
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/Result.java Thu Sep 4 14:25:41 2008
@@ -25,6 +25,13 @@
*/
private static final long serialVersionUID = 1L;
+ /*
+ * When returnStatus is set to POStatus.STATUS_ERR
+ * Operators can choose to use the result field
+ * to put a meaning error message which will be
+ * printed out in the final message shown to the user
+ */
+
public byte returnStatus;
public Object result;
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Sep 4 14:25:41 2008
@@ -210,4 +210,13 @@
//do nothing
}
+ /**
+ * @param stream
+ * @throws VisitorException
+ */
+ public void visitStream(POStream stream) throws VisitorException {
+ // TODO Auto-generated method stub
+
+ }
+
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Thu Sep 4 14:25:41 2008
@@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -44,6 +45,12 @@
*
*/
private static final long serialVersionUID = 1L;
+
+ // marker to indicate whether all input for this plan
+ // has been sent - this is currently only used in POStream
+ // to know if all map() calls and reduce() calls are finished
+ // and that there is no more input expected.
+ public boolean endOfAllInput = false;
public PhysicalPlan() {
super();
@@ -78,6 +85,7 @@
@Override
public void connect(PhysicalOperator from, PhysicalOperator to)
throws PlanException {
+
super.connect(from, to);
to.setInputs(getPredecessors(to));
}
@@ -95,12 +103,32 @@
List<PhysicalOperator> sucs = getSuccessors(op);
if(sucs!=null && sucs.size()!=0){
for (PhysicalOperator suc : sucs) {
- suc.setInputs(null);
+ // successor could have multiple inputs
+ // for example = POUnion - remove op from
+ // its list of inputs - if after removal
+ // there are no other inputs, set successor's
+ // inputs to null
+ List<PhysicalOperator> succInputs = suc.getInputs();
+ succInputs.remove(op);
+ if(succInputs.size() == 0)
+ suc.setInputs(null);
+ else
+ suc.setInputs(succInputs);
}
}
super.remove(op);
}
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.OperatorPlan#add(org.apache.pig.impl.plan.Operator)
+ @Override
+ public void add(PhysicalOperator op) {
+ // attach this plan as the plan the operator is part of
+ //op.setParentPlan(this);
+ super.add(op);
+ }
+*/
+
public boolean isEmpty() {
return (mOps.size() == 0);
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Thu Sep 4 14:25:41 2008
@@ -60,20 +60,23 @@
PigContext pc;
//Indicates whether the loader setup is done or not
boolean setUpDone = false;
+ // Indicates whether the filespec is splittable
+ boolean splittable = true;
private final Log log = LogFactory.getLog(getClass());
- public POLoad(OperatorKey k) {
- this(k,-1, null);
+ public POLoad(OperatorKey k, boolean splittable) {
+ this(k,-1, null, splittable);
}
- public POLoad(OperatorKey k, FileSpec lFile){
- this(k,-1,lFile);
+ public POLoad(OperatorKey k, FileSpec lFile, boolean splittable){
+ this(k,-1,lFile, splittable);
}
- public POLoad(OperatorKey k, int rp, FileSpec lFile) {
+ public POLoad(OperatorKey k, int rp, FileSpec lFile,boolean splittable) {
super(k, rp);
+ this.splittable = splittable;
}
/**
@@ -182,4 +185,12 @@
this.pc = pc;
}
+
+ /**
+ * @return the splittable
+ */
+ public boolean isSplittable() {
+ return splittable;
+ }
+
}
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.Tuple;
+
+public class POStream extends PhysicalOperator {
+ private static final long serialVersionUID = 2L;
+
+ private String executableManagerStr; // String representing ExecutableManager to use
+ private ExecutableManager executableManager; // ExecutableManager to use
+ private StreamingCommand command; // Actual command to be run
+ private StreamingCommand originalCommand; // Original command
+ private StreamingCommand optimizedCommand; // Optimized command
+ private Properties properties;
+
+ private boolean initialized = false;
+
+ private BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
+
+ private BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
+
+ private boolean allInputFromPredecessorConsumed = false;
+
+ private boolean allOutputFromBinaryProcessed = false;
+
+ public POStream(OperatorKey k, ExecutableManager executableManager,
+ StreamingCommand command, Properties properties) {
+ super(k);
+ this.executableManagerStr = executableManager.getClass().getName();
+ this.originalCommand = command;
+ this.command = command;
+ this.properties = properties;
+
+ // Setup streaming-specific properties
+ if (command.getShipFiles()) {
+ parseShipCacheSpecs(command.getShipSpecs(),
+ properties, "pig.streaming.ship.files");
+ }
+ parseShipCacheSpecs(command.getCacheSpecs(),
+ properties, "pig.streaming.cache.files");
+ }
+
+ private static void parseShipCacheSpecs(List<String> specs,
+ Properties properties, String property) {
+
+ String existingValue = properties.getProperty(property, "");
+ if (specs == null || specs.size() == 0) {
+ return;
+ }
+
+ // Setup streaming-specific properties
+ StringBuffer sb = new StringBuffer();
+ Iterator<String> i = specs.iterator();
+ // first append any existing value
+ if(!existingValue.equals("")) {
+ sb.append(existingValue);
+ if (i.hasNext()) {
+ sb.append(", ");
+ }
+ }
+ while (i.hasNext()) {
+ sb.append(i.next());
+ if (i.hasNext()) {
+ sb.append(", ");
+ }
+ }
+ properties.setProperty(property, sb.toString());
+ }
+
+ public Properties getShipCacheProperties() {
+ return properties;
+ }
+
+ /**
+ * Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
+ * @return the {@link StreamingCommand} for this <code>StreamSpec</code>
+ */
+ public StreamingCommand getCommand() {
+ return command;
+ }
+
+ /**
+ * Set the optimized {@link HandleSpec} for the given {@link Handle} of the
+ * <code>StreamSpec</code>.
+ *
+ * @param handle <code>Handle</code> to optimize
+ * @param spec optimized specification for the handle
+ */
+ public void setOptimizedSpec(Handle handle, String spec) {
+ if (optimizedCommand == null) {
+ optimizedCommand = (StreamingCommand)command.clone();
+ }
+
+ if (handle == Handle.INPUT) {
+ HandleSpec streamInputSpec = optimizedCommand.getInputSpec();
+ streamInputSpec.setSpec(spec);
+ } else if (handle == Handle.OUTPUT) {
+ HandleSpec streamOutputSpec = optimizedCommand.getOutputSpec();
+ streamOutputSpec.setSpec(spec);
+ }
+
+ command = optimizedCommand;
+ }
+
+ /**
+ * Revert the optimized {@link StreamingCommand} for this
+ * <code>StreamSpec</code>.
+ */
+ public void revertOptimizedCommand(Handle handle) {
+ if (optimizedCommand == null) {
+ return;
+ }
+
+ if (handle == Handle.INPUT &&
+ !command.getInputSpec().equals(originalCommand.getInputSpec())) {
+ command.setInputSpec(originalCommand.getInputSpec());
+ } else if (handle == Handle.OUTPUT &&
+ !command.getOutputSpec().equals(
+ originalCommand.getOutputSpec())) {
+ command.setOutputSpec(originalCommand.getOutputSpec());
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ // The POStream Operator works with ExecutableManager to
+ // send input to the streaming binary and to get output
+ // from it. To achieve a tuple oriented behavior, two queues
+ // are used - one for output from the binary and one for
+ // input to the binary. In each getNext() call:
+ // 1) If there is no more output expected from the binary, an EOP is
+ // sent to successor
+ // 2) If there is any output from the binary in the queue, it is passed
+ // down to the successor
+ // 3) if neither of these two are true and if it is possible to
+ // send input to the binary, then the next tuple from the
+ // predecessor is got and passed to the binary
+ try {
+ // if we are being called AFTER all output from the streaming
+ // binary has already been sent to us then just return EOP
+ // The "allOutputFromBinaryProcessed" flag is set when we see
+ // an EOS (End of Stream output) from streaming binary
+ if(allOutputFromBinaryProcessed) {
+ return new Result(POStatus.STATUS_EOP, null);
+ }
+
+ // if we are here AFTER all map() calls have been completed
+ // AND AFTER we process all possible input to be sent to the
+ // streaming binary, then all we want to do is read output from
+ // the streaming binary
+ if(allInputFromPredecessorConsumed) {
+ Result r = binaryOutputQueue.take();
+ if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also since we are being called
+ // after all input from predecessor has been processed
+ // it means we got here from a call from close() in
+ // map or reduce. So once we send this EOP down,
+ // getNext() in POStream should never be called. So
+ // we don't need to set any flag noting we saw all output
+ // from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ return(r);
+ }
+
+ // if we are here, we haven't consumed all input to be sent
+ // to the streaming binary - check if we are being called
+ // from close() on the map or reduce
+ if(this.parentPlan.endOfAllInput) {
+ Result r = getNextHelper(t);
+ if(r.returnStatus == POStatus.STATUS_EOP) {
+ // we have now seen *ALL* possible input
+ // check if we ever had any real input
+ // in the course of the map/reduce - if we did
+ // then "initialized" will be true. If not, just
+ // send EOP down.
+ if(initialized) {
+ // signal End of ALL input to the Executable Manager's
+ // Input handler thread
+ binaryInputQueue.put(r);
+ // note this state for future calls
+ allInputFromPredecessorConsumed = true;
+ // look for output from binary
+ r = binaryOutputQueue.take();
+ if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also since we are being called
+ // after all input from predecessor has been processed
+ // it means we got here from a call from close() in
+ // map or reduce. So once we send this EOP down,
+ // getNext() in POStream should never be called. So
+ // we don't need to set any flag noting we saw all output
+ // from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ }
+
+ } else if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline. Also we are being called
+ // from close() in map or reduce (this is so because
+ // only then this.parentPlan.endOfAllInput is true).
+ // So once we send this EOP down, getNext() in POStream
+ // should never be called. So we don't need to set any
+ // flag noting we saw all output from binary
+ r.returnStatus = POStatus.STATUS_EOP;
+ }
+ return r;
+ } else {
+ // we are not being called from close() - so
+ // we must be called from either map() or reduce()
+ // get the next Result from helper
+ Result r = getNextHelper(t);
+ if(r.returnStatus == POStatus.STATUS_EOS) {
+ // If we received EOS, it means all output
+ // from the streaming binary has been sent to us
+ // So we can send an EOP to the successor in
+ // the pipeline and also note this condition
+ // for future calls
+ r.returnStatus = POStatus.STATUS_EOP;
+ allOutputFromBinaryProcessed = true;
+ }
+ return r;
+ }
+
+ } catch(Exception e) {
+ throw new ExecException("Error while trying to get next result in POStream", e);
+ }
+
+
+ }
+
+ public Result getNextHelper(Tuple t) throws ExecException {
+ try {
+ synchronized(this) {
+ while(true) {
+ // if there is something in binary output Queue
+ // return it
+ if(!binaryOutputQueue.isEmpty()) {
+ Result res = binaryOutputQueue.take();
+ return res;
+ }
+
+ // check if we can write tuples to
+ // input of the process
+ if(binaryInputQueue.remainingCapacity() > 0) {
+
+ Result input = processInput();
+ if(input.returnStatus == POStatus.STATUS_EOP ||
+ input.returnStatus == POStatus.STATUS_ERR) {
+ return input;
+ } else {
+ // we have a tuple to send as input
+ // Only when we see the first tuple which can
+ // be sent as input to the binary we want
+ // to initialize the ExecutableManager and set
+ // up the streaming binary - this is required in
+ // Unions due to a JOIN where there may never be
+ // any input to send to the binary in one of the map
+ // tasks - so we initialize only if we have to.
+ // initialize the ExecutableManager once
+ if(!initialized) {
+ // set up the executableManager
+ executableManager =
+ (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr);
+
+ try {
+ executableManager.configure(this);
+ executableManager.run();
+ } catch (IOException e) {
+ throw new ExecException("Error while running streaming binary", e);
+ }
+ initialized = true;
+ }
+
+ // send this input to the streaming
+ // process
+ binaryInputQueue.put(input);
+ }
+
+ } else {
+
+ // wait for either input to be available
+ // or output to be consumed
+ while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty())
+ wait();
+
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new ExecException("Error while trying to get next result in POStream", e);
+ }
+ }
+
+ public String toString() {
+ return "POStream" + "[" + command.toString() + "]" + " - " + mKey.toString();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#visit(org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor)
+ */
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitStream(this);
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.Operator#name()
+ */
+ @Override
+ public String name() {
+ return toString();
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.Operator#supportsMultipleInputs()
+ */
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.pig.impl.plan.Operator#supportsMultipleOutputs()
+ */
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ /**
+ *
+ */
+ public void finish() throws IOException {
+ executableManager.close();
+ }
+
+ /**
+ * @return the Queue which has input to binary
+ */
+ public BlockingQueue<Result> getBinaryInputQueue() {
+ return binaryInputQueue;
+ }
+
+ /**
+ * @return the Queue which has output from binary
+ */
+ public BlockingQueue<Result> getBinaryOutputQueue() {
+ return binaryOutputQueue;
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Thu Sep 4 14:25:41 2008
@@ -137,17 +137,13 @@
while(true){
if(reporter!=null) reporter.progress();
res = inputs.get(ind).getNext(t);
- if(res.returnStatus == POStatus.STATUS_NULL)
- continue;
-
lastInd = ind + 1;
- if(res.returnStatus == POStatus.STATUS_ERR)
- return new Result();
-
- if (res.returnStatus == POStatus.STATUS_OK)
+ if(res.returnStatus == POStatus.STATUS_OK ||
+ res.returnStatus == POStatus.STATUS_NULL || res.returnStatus == POStatus.STATUS_ERR) {
return res;
-
+ }
+
if (res.returnStatus == POStatus.STATUS_EOP) {
done.set(ind);
break;
Added: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link HadoopExecutableManager} is a specialization of
+ * {@link ExecutableManager} and provides HDFS-specific support for secondary
+ * outputs, task-logs etc.
+ *
+ * <code>HadoopExecutableManager</code> provides support for secondary outputs
+ * of the managed process and also persists the logs of the tasks on HDFS.
+ */
+public class HadoopExecutableManager extends ExecutableManager {
+ // The part-<partition> file name, similar to Hadoop's outputs
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ static String getOutputName(int partition) {
+ return "part-" + NUMBER_FORMAT.format(partition);
+ }
+
+ JobConf job;
+
+ String scriptOutputDir;
+ String scriptLogDir;
+ String taskId;
+
+ FSDataOutputStream errorStream;
+
+ boolean writeHeaderFooter = false;
+
+ public HadoopExecutableManager() {}
+
+ public void configure(POStream stream)
+ throws IOException, ExecException {
+ super.configure(stream);
+
+ // Chmod +x the executable
+ File executable = new File(command.getExecutable());
+ if (executable.isAbsolute()) {
+ // we don't own it. Hope it is executable ...
+ } else {
+ try {
+ FileUtil.chmod(executable.toString(), "a+x");
+ } catch (InterruptedException ie) {
+ throw new ExecException(ie);
+ }
+ }
+
+ // Save a copy of the JobConf
+ job = PigMapReduce.sJobConf;
+
+ // Save the output directory for the Pig Script
+ scriptOutputDir = job.get("pig.streaming.task.output.dir");
+ scriptLogDir = job.get("pig.streaming.log.dir", "_logs");
+
+ // Save the taskid
+ taskId = job.get("mapred.task.id");
+ }
+
+ protected void exec() throws IOException {
+ // Create the HDFS file for the stderr of the task, if necessary
+ if (writeErrorToHDFS(command.getLogFilesLimit(), taskId)) {
+ try {
+ Path errorFile =
+ new Path(new Path(scriptLogDir, command.getLogDir()), taskId);
+ errorStream =
+ errorFile.getFileSystem(job).create(errorFile);
+ } catch (IOException ie) {
+ // Don't fail the task if we couldn't save it's stderr on HDFS
+ System.err.println("Failed to create stderr file of task: " +
+ taskId + " in HDFS at " + scriptLogDir +
+ " with " + ie);
+ errorStream = null;
+ }
+ }
+
+ // Header for stderr file of the task
+ writeDebugHeader();
+
+ // Exec the command ...
+ super.exec();
+ }
+
+ public void close() throws IOException {
+ try {
+ super.close();
+
+ // Copy the secondary outputs of the task to HDFS
+ Path scriptOutputDir = new Path(this.scriptOutputDir);
+ FileSystem fs = scriptOutputDir.getFileSystem(job);
+ List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+ if (outputSpecs != null) {
+ for (int i=1; i < outputSpecs.size(); ++i) {
+ String fileName = outputSpecs.get(i).getName();
+ try {
+ int partition = job.getInt("mapred.task.partition", -1);
+ fs.copyFromLocalFile(false, true, new Path(fileName),
+ new Path(
+ new Path(scriptOutputDir,
+ fileName),
+ getOutputName(partition))
+ );
+ } catch (IOException ioe) {
+ System.err.println("Failed to save secondary output '" +
+ fileName + "' of task: " + taskId +
+ " with " + ioe);
+ throw ioe;
+ }
+ }
+ }
+ } finally {
+ // Footer for stderr file of the task
+ writeDebugFooter();
+
+ // Close the stderr file on HDFS
+ if (errorStream != null) {
+ errorStream.close();
+ }
+ }
+ }
+
+ /**
+ * Should the stderr data of this task be persisted on HDFS?
+ *
+ * @param limit maximum number of tasks whose stderr log-files are persisted
+ * @param taskId id of the task
+ * @return <code>true</code> if stderr data of task should be persisted on
+ * HDFS, <code>false</code> otherwise
+ */
+ private boolean writeErrorToHDFS(int limit, String taskId) {
+ if (command.getPersistStderr()) {
+ // These are hard-coded begin/end offsets a Hadoop *taskid*
+ int beginIndex = 25, endIndex = 31;
+
+ int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+ return tipId < command.getLogFilesLimit();
+ }
+ return false;
+ }
+
+ protected void processError(String error) {
+ super.processError(error);
+
+ try {
+ if (errorStream != null) {
+ errorStream.writeBytes(error);
+ }
+ } catch (IOException ioe) {
+ super.processError("Failed to save error logs to HDFS with: " +
+ ioe);
+ }
+ }
+
+ private void writeDebugHeader() {
+ processError("===== Task Information Header =====" );
+
+ processError("\nCommand: " + command);
+ processError("\nStart time: " + new Date(System.currentTimeMillis()));
+ if (job.getBoolean("mapred.task.is.map", false)) {
+ processError("\nInput-split file: " + job.get("map.input.file"));
+ processError("\nInput-split start-offset: " +
+ job.getLong("map.input.start", -1));
+ processError("\nInput-split length: " +
+ job.getLong("map.input.length", -1));
+ }
+ processError("\n===== * * * =====\n");
+ }
+
+ private void writeDebugFooter() {
+ processError("===== Task Information Footer =====");
+
+ processError("\nEnd time: " + new Date(System.currentTimeMillis()));
+ processError("\nExit code: " + exitCode);
+
+ List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+ HandleSpec inputSpec =
+ (inputSpecs != null) ? inputSpecs.get(0) : null;
+ if (inputSpec == null ||
+ !inputSpec.getSpec().contains("BinaryStorage")) {
+ processError("\nInput records: " + inputRecords);
+ }
+ processError("\nInput bytes: " + inputBytes + " bytes " +
+ ((inputSpec != null) ?
+ "(" + inputSpec.getName() + " using " +
+ inputSpec.getSpec() + ")"
+ : ""));
+
+ List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+ HandleSpec outputSpec =
+ (outputSpecs != null) ? outputSpecs.get(0) : null;
+ if (outputSpec == null ||
+ !outputSpec.getSpec().contains("BinaryStorage")) {
+ processError("\nOutput records: " + outputRecords);
+ }
+ processError("\nOutput bytes: " + outputBytes + " bytes " +
+ ((outputSpec != null) ?
+ "(" + outputSpec.getName() + " using " +
+ outputSpec.getSpec() + ")"
+ : ""));
+ if (outputSpecs != null) {
+ for (int i=1; i < outputSpecs.size(); ++i) {
+ HandleSpec spec = outputSpecs.get(i);
+ processError("\n " + new File(spec.getName()).length()
+ + " bytes using " + spec.getSpec());
+ }
+ }
+
+ processError("\n===== * * * =====\n");
+ }
+}
+
+
\ No newline at end of file
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Thu Sep 4 14:25:41 2008
@@ -53,8 +53,11 @@
//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.MapReduceLauncher;
//import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
import org.apache.pig.backend.local.executionengine.LocalExecutionEngine;
-import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.streaming.HadoopExecutableManager;
+import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.WrappedIOException;
@@ -102,10 +105,19 @@
*/
private Map<String, FuncSpec> definedFunctions = new HashMap<String, FuncSpec>();
+ /**
+ * a table mapping names to streaming commands.
+ */
+ private Map<String, StreamingCommand> definedCommands =
+ new HashMap<String, StreamingCommand>();
+
private static ArrayList<String> packageImportList = new ArrayList<String>();
public boolean debug = true;
+ // List of paths skipped for automatic shipping
+ List<String> skippedShipPaths = new ArrayList<String>();
+
public PigContext() {
this(ExecType.MAPREDUCE, new Properties());
}
@@ -295,6 +307,22 @@
}
/**
+ * Defines an alias for the given streaming command.
+ *
+ * This is useful for complicated streaming command specs.
+ *
+ * @param alias - the new command alias to define.
+ * @param command - the command
+ */
+ public void registerStreamCmd(String alias, StreamingCommand command) {
+ if (command == null) {
+ definedCommands.remove(alias);
+ } else {
+ definedCommands.put(alias, command);
+ }
+ }
+
+ /**
* Returns the type of execution currently in effect.
*
* @return current execution type
@@ -435,9 +463,48 @@
return instantiateFuncFromSpec(alias);
}
+ /**
+ * Get the {@link StreamingCommand} for the given alias.
+ *
+ * @param alias the alias for the <code>StreamingCommand</code>
+ * @return <code>StreamingCommand</code> for the alias
+ */
+ public StreamingCommand getCommandForAlias(String alias) {
+ return definedCommands.get(alias);
+ }
+
public void setExecType(ExecType execType) {
this.execType = execType;
}
+
+ /**
+ * Create a new {@link ExecutableManager} depending on the ExecType.
+ *
+ * @return a new {@link ExecutableManager} depending on the ExecType
+ * @throws ExecException
+ */
+ public ExecutableManager createExecutableManager() throws ExecException {
+ ExecutableManager executableManager = null;
+
+ switch (execType) {
+ case LOCAL:
+ {
+ executableManager = new ExecutableManager();
+ }
+ break;
+ case MAPREDUCE:
+ {
+ executableManager = new HadoopExecutableManager();
+ }
+ break;
+ default:
+ {
+ throw new ExecException("Unkown execType: " + execType);
+ }
+ }
+
+ return executableManager;
+ }
public FuncSpec getFuncSpecFromAlias(String alias) {
FuncSpec funcSpec;
@@ -446,4 +513,25 @@
else
return null;
}
+
+ /**
+ * Add a path to be skipped while automatically shipping binaries for
+ * streaming.
+ *
+ * @param path path to be skipped
+ */
+ public void addPathToSkip(String path) {
+ skippedShipPaths.add(path);
+ }
+
+ /**
+ * Get paths which are to skipped while automatically shipping binaries for
+ * streaming.
+ *
+ * @return paths which are to skipped while automatically shipping binaries
+ * for streaming
+ */
+ public List<String> getPathsToSkip() {
+ return skippedShipPaths;
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/BufferedPositionedInputStream.java Thu Sep 4 14:25:41 2008
@@ -160,4 +160,9 @@
sb.append(carray, 0, cbuff.position());
return sb.toString();
}
+
+ public void close() throws IOException {
+ super.close();
+ in.close();
+ }
}