You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/04 23:25:43 UTC
svn commit: r692253 [2/3] - in /incubator/pig/branches/types: ./
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/ex...
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Thu Sep 4 14:25:41 2008
@@ -414,6 +414,26 @@
return elem.exists() || globMatchesFiles(elem, store);
}
+ public static boolean isFile(String filename, PigContext context)
+ throws IOException {
+ return !isDirectory(filename, context.getDfs());
+ }
+
+ public static boolean isFile(String filename, DataStorage store)
+ throws IOException {
+ return !isDirectory(filename, store);
+ }
+
+ public static boolean isDirectory(String filename, PigContext context)
+ throws IOException {
+ return isDirectory(filename, context.getDfs());
+ }
+
+ public static boolean isDirectory(String filename, DataStorage store)
+ throws IOException {
+ ElementDescriptor elem = store.asElement(filename);
+ return (elem instanceof ContainerDescriptor);
+ }
private static boolean globMatchesFiles(ElementDescriptor elem,
DataStorage fs)
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Thu Sep 4 14:25:41 2008
@@ -32,6 +32,7 @@
public class LOLoad extends LogicalOperator {
private static final long serialVersionUID = 2L;
+ protected boolean splittable = true;
private FileSpec mInputFileSpec;
private LoadFunc mLoadFunc;
@@ -51,11 +52,12 @@
*
*/
public LOLoad(LogicalPlan plan, OperatorKey key, FileSpec inputFileSpec,
- URL schemaFile) throws IOException {
+ URL schemaFile, boolean splittable) throws IOException {
super(plan, key);
mInputFileSpec = inputFileSpec;
mSchemaFile = schemaFile;
+ this.splittable = splittable;
try {
mLoadFunc = (LoadFunc)
@@ -139,6 +141,10 @@
this.mEnforcedSchema = enforcedSchema;
}
+ public boolean isSplittable() {
+ return splittable;
+ }
+
@Override
public byte getType() {
return DataType.BAG ;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Thu Sep 4 14:25:41 2008
@@ -173,6 +173,10 @@
protected void visit(LOLimit limOp) throws VisitorException {
return;
}
+
+ protected void visit(LOStream stream) throws VisitorException {
+ return;
+ }
/**
*
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Sep 4 14:25:41 2008
@@ -47,6 +47,8 @@
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.BagFactory;
@@ -139,7 +141,7 @@
storePlan.add(store);
storePlan.add(input);
storePlan.connect(input, store);
- attachPlan(storePlan, input, readFrom);
+ attachPlan(storePlan, input, readFrom, new HashMap<LogicalOperator, Boolean>());
} catch (ParseException pe) {
throw new FrontendException(pe.getMessage());
}
@@ -316,6 +318,130 @@
return mapAliasOp.get(alias);
}
+ // Check and set files to be automatically shipped for the given StreamingCommand
+ // Auto-shipping rules:
+ // 1. If the command begins with either perl or python assume that the
+ // binary is the first non-quoted string it encounters that does not
+ // start with dash - subject to restrictions in (2).
+ // 2. Otherwise, attempt to ship the first string from the command line as
+ // long as it does not come from /bin, /user/bin, /user/local/bin.
+ // It will determine that by scanning the path if an absolute path is
+ // provided or by executing "which". The paths can be made configurable
+ // via "set stream.skippath <paths>" option.
+ private static final String PERL = "perl";
+ private static final String PYTHON = "python";
+ private void checkAutoShipSpecs(StreamingCommand command, String[] argv)
+ throws ParseException {
+ // Candidate for auto-ship
+ String arg0 = argv[0];
+
+ // Check if command is perl or python ... if so use the first non-option
+ // and non-quoted string as the candidate
+ if (arg0.equalsIgnoreCase(PERL) || arg0.equalsIgnoreCase(PYTHON)) {
+ for (int i=1; i < argv.length; ++i) {
+ if (!argv[i].startsWith("-") && !isQuotedString(argv[i])) {
+ checkAndShip(command, argv[i]);
+ break;
+ }
+ }
+ } else {
+ // Ship the first argument if it can be ...
+ checkAndShip(command, arg0);
+ }
+ }
+
+ private void checkAndShip(StreamingCommand command, String arg)
+ throws ParseException {
+ // Don't auto-ship if it is an absolute path...
+ if (arg.startsWith("/")) {
+ return;
+ }
+
+ // $ which arg
+ String argPath = which(arg);
+ if (argPath != null && !inSkipPaths(argPath)) {
+ try {
+ command.addPathToShip(argPath);
+ } catch(IOException e) {
+ throw new ParseException(e.getMessage());
+ }
+ }
+
+ }
+
+ private boolean isQuotedString(String s) {
+ return (s.charAt(0) == '\'' && s.charAt(s.length()-1) == '\'');
+ }
+
+ // Check if file is in the list paths to be skipped
+ private boolean inSkipPaths(String file) {
+ for (String skipPath : pigContext.getPathsToSkip()) {
+ if (file.startsWith(skipPath)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+
+ private static String which(String file) {
+ try {
+ ProcessBuilder processBuilder =
+ new ProcessBuilder(new String[] {"which", file});
+ Process process = processBuilder.start();
+
+ BufferedReader stdout =
+ new BufferedReader(new InputStreamReader(process.getInputStream()));
+ String fullPath = stdout.readLine();
+
+ return (process.waitFor() == 0) ? fullPath : null;
+ } catch (Exception e) {}
+ return null;
+ }
+
+ private static final char SINGLE_QUOTE = '\'';
+ private static final char DOUBLE_QUOTE = '"';
+ private static String[] splitArgs(String command) throws ParseException {
+ List<String> argv = new ArrayList<String>();
+
+ int beginIndex = 0;
+
+ while (beginIndex < command.length()) {
+ // Skip spaces
+ while (Character.isWhitespace(command.charAt(beginIndex))) {
+ ++beginIndex;
+ }
+
+ char delim = ' ';
+ char charAtIndex = command.charAt(beginIndex);
+ if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+ delim = charAtIndex;
+ }
+
+ int endIndex = command.indexOf(delim, beginIndex+1);
+ if (endIndex == -1) {
+ if (Character.isWhitespace(delim)) {
+ // Reached end of command-line
+ argv.add(command.substring(beginIndex));
+ break;
+ } else {
+ // Didn't find the ending quote/double-quote
+ throw new ParseException("Illegal command: " + command);
+ }
+ }
+
+ if (Character.isWhitespace(delim)) {
+ // Do not consume the space
+ argv.add(command.substring(beginIndex, endIndex));
+ } else {
+ argv.add(command.substring(beginIndex, endIndex+1));
+ }
+
+ beginIndex = endIndex + 1;
+ }
+
+ return argv.toArray(new String[argv.size()]);
+ }
//BEGIN
//I am maintaining state about the operators that should
@@ -371,8 +497,13 @@
return aliases.get(op);
}
- public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan) throws ParseException {
+ public static void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan, Map<LogicalOperator, Boolean> rootProcessed) throws ParseException {
log.trace("Entering attachPlan");
+ if((rootProcessed.get(root) != null) && (rootProcessed.get(root))) {
+ log.trace("Root has been processed");
+ log.trace("Exiting attachPlan");
+ return;
+ }
lp.add(root);
log.debug("Added operator " + root + " to the logical plan " + lp);
if(null == rootPlan.getPredecessors(root)) {
@@ -380,7 +511,8 @@
return;
}
for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) {
- attachPlan(lp, rootPred, rootPlan);
+ attachPlan(lp, rootPred, rootPlan, rootProcessed);
+ rootProcessed.put(rootPred, true);
try {
lp.connect(rootPred, root);
log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp);
@@ -558,8 +690,17 @@
TOKEN : { <MAP : "map"> }
TOKEN : { <IS : "is"> }
TOKEN : { <NULL : "null"> }
+TOKEN : { <STREAM : "stream"> }
+TOKEN : { <THROUGH : "through"> }
TOKEN : { <STORE : "store"> }
-TOKEN : { <LIMIT : "limit"> }
+TOKEN : { <SHIP: "ship"> }
+TOKEN : { <CACHE: "cache"> }
+TOKEN : { <INPUT: "input"> }
+TOKEN : { <OUTPUT: "output"> }
+TOKEN : { <ERROR: "stderr"> }
+TOKEN : { <STDIN: "stdin"> }
+TOKEN : { <STDOUT: "stdout"> }
+TOKEN : { <LIMIT: "limit"> }
TOKEN:
{
@@ -594,6 +735,7 @@
)*
"'"> }
+TOKEN : { <EXECCOMMAND : "`" (~["`"])* "`"> }
// Pig has special variables starting with $
TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
@@ -628,6 +770,7 @@
roots.add(op);
}
+ Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
for(LogicalOperator op: roots) {
//At this point we have a logical plan for the pig statement
//In order to construct the entire logical plan we need to traverse
@@ -638,7 +781,8 @@
LogicalPlan rootPlan = aliases.get(op);
if(null != rootPlan) {
- attachPlan(lp, op, rootPlan);
+ attachPlan(lp, op, rootPlan, rootProcessed);
+ rootProcessed.put(op, true);
}
}
@@ -791,6 +935,25 @@
| (<JOIN> op = JoinClause(lp))
| (<UNION> op = UnionClause(lp))
| (<FOREACH> op = ForEachClause(lp))
+| (<STREAM> op = StreamClause(lp)
+ [ <AS>
+ (
+ LOOKAHEAD(2) "(" schema = TupleSchema() ")"
+ {
+ SchemaUtils.setSchemaDefaultType(schema, DataType.BYTEARRAY);
+ op.setSchema(schema);
+ op.setCanonicalNames();
+ log.info("Stream as schema()"+ schema);
+ }
+ | fs = AtomSchema()
+ {
+ schema = new Schema(fs);
+ op.setSchema(schema);
+ log.info("Stream as atomschema()" + schema);
+ }
+ )
+ ]
+ )
| (<STORE> op = StoreClause(lp))
)
[<PARALLEL> t2=<INTEGER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
@@ -800,12 +963,14 @@
LogicalOperator LoadClause(LogicalPlan lp) :
{
- Token t1, t2;
+ Token t1, t2, t3;
String filename;
String funcName,funcArgs =null;
FuncSpec funcSpec = null;
String funcSpecAsString = null;
LOLoad lo=null;
+ String splitBy;
+ boolean splittable = true;
log.trace("Entering LoadClause");
}
{
@@ -818,13 +983,24 @@
log.debug("LoadClause: funcSpec = " + funcSpec);
}
)?
+ (
+ <SPLIT> <BY> t3 = <QUOTEDSTRING>
+ {
+ splitBy = unquote(t3.image);
+ if (splitBy.equalsIgnoreCase("file")) {
+ splittable = false;
+ }
+ }
+ )?
)
{
- if (funcSpec == null){
- funcSpec = new FuncSpec(PigStorage.class.getName());
+ if (funcSpecAsString == null){
+ funcSpecAsString = PigStorage.class.getName();
+ funcSpec = new FuncSpec(funcSpecAsString);
+ log.debug("LoadClause: funcSpec = " + funcSpec);
}
- lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null);
+ lo = new LOLoad(lp, new OperatorKey(scope, getNextId()), new FileSpec(massageFilename(filename, pigContext), funcSpec), null, splittable);
lp.add(lo);
log.debug("Added operator " + lo.getClass().getName() + " to the logical plan");
@@ -1488,11 +1664,13 @@
the list of foreach plans
*/
+ Map<LogicalOperator, Boolean> rootProcessed = new HashMap<LogicalOperator, Boolean>();
for(LogicalOperator project: mapProjectInputs.keySet()) {
for(LogicalOperator projectInput: mapProjectInputs.get(project)) {
generatePlan.add(projectInput);
generatePlan.connect(projectInput, project);
- attachPlan(generatePlan, projectInput, foreachPlan);
+ attachPlan(generatePlan, projectInput, foreachPlan, rootProcessed);
+ rootProcessed.put(projectInput, true);
}
}
}
@@ -1517,15 +1695,104 @@
}
}
-LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token t1; String functionName, functionArgs;}
+LogicalOperator StreamClause(LogicalPlan lp):
+{
+ LogicalOperator input;
+ StreamingCommand command;
+}
+{
+ input = NestedExpr(lp)
+
+ <THROUGH> command = Command()
+ {
+ LOStream loStream = new LOStream(lp, new OperatorKey(scope, getNextId()), input,
+ pigContext.createExecutableManager(), command);
+ //addAlias(input.getAlias(), input);
+ lp.add(loStream);
+ lp.connect(input, loStream);
+ return loStream;
+ }
+}
+
+StreamingCommand Command(): {Token t; StreamingCommand command;}
+{
+ t = <EXECCOMMAND>
+ {
+ String[] argv = splitArgs(unquote(t.image));
+ command = new StreamingCommand(pigContext, argv);
+ checkAutoShipSpecs(command, argv);
+ return command;
+ }
+ |
+ t = <IDENTIFIER>
+ {
+ command = pigContext.getCommandForAlias(t.image);
+ if (command == null) {
+ throw new ParseException("Undefined command-alias: " + t.image +
+ " used as stream operator");
+ }
+
+ return command;
+ }
+}
+
+LogicalOperator DefineClause(LogicalPlan lp) : {Token t; Token cmd; String functionName, functionArgs;}
{
t = <IDENTIFIER>
(
+ (
+ cmd = <EXECCOMMAND>
+ {
+ StreamingCommand command =
+ new StreamingCommand(pigContext, splitArgs(unquote(cmd.image)));
+ String[] paths;
+ StreamingCommand.HandleSpec[] handleSpecs;
+ }
+ (
+ <SHIP> "(" paths = PathList() ")"
+ {
+ if (paths.length == 0) {
+ command.setShipFiles(false);
+ } else {
+ for (String path : paths) {
+ try {
+ command.addPathToShip(path);
+ } catch(IOException e) {
+ throw new ParseException(e.getMessage());
+ }
+ }
+ }
+ }
+ |
+ <CACHE> "(" paths = PathList() ")"
+ {
+ for (String path : paths) {
+ try {
+ command.addPathToCache(path);
+ } catch(IOException e) {
+ throw new ParseException(e.getMessage());
+ }
+ }
+ }
+ |
+ <INPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.INPUT) ")"
+ |
+ <OUTPUT> "(" InputOutputSpec(command, StreamingCommand.Handle.OUTPUT) ")"
+ |
+ <ERROR> "(" ErrorSpec(command, t.image) ")"
+ )*
+ {
+ pigContext.registerStreamCmd(t.image, command);
+ }
+ )
+ |
+ (
functionName = QualifiedFunction() "(" functionArgs = StringList() ")"
{
pigContext.registerFunction(t.image, new FuncSpec(functionName + "(" + functionArgs + ")"));
}
)
+ )
{
// Return the dummy LODefine
LogicalOperator lo = new LODefine(lp, new OperatorKey(scope, getNextId()));
@@ -1534,6 +1801,83 @@
}
}
+String[] PathList() : {Token t; List<String> pathList = new ArrayList<String>();}
+{
+ (
+ (
+ t = <QUOTEDSTRING> {pathList.add(unquote(t.image));}
+ ( "," t = <QUOTEDSTRING> {pathList.add(unquote(t.image));} )*
+ )
+ | {}
+ )
+ {return pathList.toArray(new String[pathList.size()]);}
+}
+
+void InputOutputSpec(StreamingCommand command, StreamingCommand.Handle handle):
+{
+ String stream, deserializer;
+ StreamingCommand.HandleSpec[] handleSpecs;
+ String functionName = "PigStorage", functionArgs="";
+}
+{
+ stream = CommandStream()
+ [
+ <USING> functionName = QualifiedFunction()
+ [
+ "(" functionArgs = StringList() ")"
+ ]
+ ]
+ {
+ deserializer = functionName + "(" + functionArgs + ")";
+ command.addHandleSpec(handle,
+ new HandleSpec(stream, deserializer)
+ );
+ }
+ (
+ ","
+ stream = CommandStream()
+ [
+ <USING> functionName = QualifiedFunction()
+ [
+ "(" functionArgs = StringList() ")"
+ ]
+ ]
+ {
+ deserializer = functionName + "(" + functionArgs + ")";
+ command.addHandleSpec(handle,
+ new HandleSpec(stream, deserializer)
+ );
+ }
+ )*
+}
+
+String CommandStream(): {Token t;}
+{
+ t = <STDIN>
+ {return "stdin";}
+ |
+ t = <STDOUT>
+ {return "stdout";}
+ |
+ t = <QUOTEDSTRING>
+ {return unquote(t.image);}
+}
+
+void ErrorSpec(StreamingCommand command, String alias): {Token t1, t2; int limit = StreamingCommand.MAX_TASKS;}
+{
+ (
+ t1 = <QUOTEDSTRING>
+ (<LIMIT> t2 = <INTEGER> {limit = Integer.parseInt(t2.image);})?
+ {
+ command.setLogDir(unquote(t1.image));
+ command.setLogFilesLimit(limit);
+ }
+ )
+ |
+ {
+ command.setLogDir(alias);
+ }
+}
LogicalOperator StoreClause(LogicalPlan lp) : {LogicalOperator lo; Token t; String fileName; String functionSpec = null;
String functionName, functionArgs;}
{
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultInputHandler.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link DefaultInputHandler} handles the input for the Pig-Streaming
+ * executable in a {@link InputType#SYNCHRONOUS} manner by feeding it input
+ * via its <code>stdin</code>.
+ */
+public class DefaultInputHandler extends InputHandler {
+
+ OutputStream stdin;
+
+ public DefaultInputHandler() {
+ serializer = new PigStorage();
+ }
+
+ public DefaultInputHandler(HandleSpec spec) {
+ serializer = (StoreFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+ }
+
+ public InputType getInputType() {
+ return InputType.SYNCHRONOUS;
+ }
+
+ public void bindTo(OutputStream os) throws IOException {
+ stdin = os;
+ super.bindTo(stdin);
+ }
+
+ @Override
+ public synchronized void close(Process process) throws IOException {
+ if(!alreadyClosed) {
+ alreadyClosed = true;
+ super.close(process);
+ try {
+ stdin.flush();
+ stdin.close();
+ stdin = null;
+ } catch(IOException e) {
+ // check if we got an exception because
+ // the process actually completed and we were
+ // trying to flush and close it's stdin
+ if(process == null || process.exitValue() != 0) {
+ // the process had not terminated normally
+ // throw the exception we got
+ throw e;
+ }
+ }
+
+ }
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/DefaultOutputHandler.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#SYNCHRONOUS} manner by reading its output
+ * via its <code>stdout</code>.
+ */
+public class DefaultOutputHandler extends OutputHandler {
+ BufferedPositionedInputStream stdout;
+
+ public DefaultOutputHandler() {
+ deserializer = new PigStorage();
+ }
+
+ public DefaultOutputHandler(HandleSpec spec) {
+ deserializer = (LoadFunc)PigContext.instantiateFuncFromSpec(spec.spec);
+ }
+
+ public OutputType getOutputType() {
+ return OutputType.SYNCHRONOUS;
+ }
+
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ stdout = is;
+ super.bindTo(fileName, stdout, offset, end);
+ }
+
+ public synchronized void close() throws IOException {
+ if(!alreadyClosed) {
+ super.close();
+ stdout.close();
+ stdout = null;
+ alreadyClosed = true;
+ }
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/ExecutableManager.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,624 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.streaming.InputHandler.InputType;
+import org.apache.pig.impl.streaming.OutputHandler.OutputType;
+
+/**
+ * {@link ExecutableManager} manages an external executable which processes data
+ * in a Pig query.
+ *
+ * The <code>ExecutableManager</code> is responsible for startup/teardown of
+ * the external process and also for managing it. It feeds input records to the
+ * executable via it's <code>stdin</code>, collects the output records from
+ * the <code>stdout</code> and also diagnostic information from the
+ * <code>stdout</code>.
+ */
+public class ExecutableManager {
+ private static final Log LOG = LogFactory.getLog(ExecutableManager.class
+ .getName());
+ private static final int SUCCESS = 0;
+ private static final String PATH = "PATH";
+ private static final String BASH = "bash";
+ private static final Result EOS_RESULT = new Result(POStatus.STATUS_EOS, null);
+
+ protected StreamingCommand command; // Streaming command to be run
+ String argvAsString; // Parsed commands
+
+ Process process; // Handle to the process
+ protected int exitCode = -127; // Exit code of the process
+
+ protected DataOutputStream stdin; // stdin of the process
+ ProcessInputThread stdinThread; // thread to send input to process
+
+ ProcessOutputThread stdoutThread; // thread to get process stdout
+ InputStream stdout; // stdout of the process
+
+ ProcessErrorThread stderrThread; // thread to get process stderr
+ InputStream stderr; // stderr of the process
+
+ // Input/Output handlers
+ InputHandler inputHandler;
+ OutputHandler outputHandler;
+
+ // Statistics
+ protected long inputRecords = 0;
+ protected long inputBytes = 0;
+ protected long outputRecords = 0;
+ protected long outputBytes = 0;
+
+ protected volatile Throwable outerrThreadsError;
+ private POStream poStream;
+ private ProcessInputThread fileInputThread;
+
+ /**
+ * Create a new {@link ExecutableManager}.
+ */
+ public ExecutableManager() {
+ }
+
+ /**
+ * Configure and initialize the {@link ExecutableManager}.
+ *
+ * @param properties
+ * {@link Properties} for the <code>ExecutableManager</code>
+ * @param command
+ * {@link StreamingCommand} to be run by the
+ * <code>ExecutableManager</code>
+ * @param endOfPipe
+ * {@link DataCollector} to be used to push results of the
+ * <code>StreamingCommand</code> down
+ * @throws IOException
+ * @throws ExecException
+ */
+ public void configure(POStream stream) throws IOException, ExecException {
+ this.poStream = stream;
+ this.command = stream.getCommand();
+ String[] argv = this.command.getCommandArgs();
+ argvAsString = "";
+ for (String arg : argv) {
+ argvAsString += arg;
+ argvAsString += " ";
+ }
+
+ // Create the input/output handlers
+ this.inputHandler = HandlerFactory.createInputHandler(command);
+ this.outputHandler = HandlerFactory.createOutputHandler(command);
+ }
+
+ /**
+ * Close and cleanup the {@link ExecutableManager}.
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ // Close the InputHandler, which in some cases lets the process
+ // terminate
+ inputHandler.close(process);
+
+ // Check if we need to start the process now ...
+ if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+ exec();
+ }
+
+ // Wait for the process to exit
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException ie) {
+ LOG.error("Unexpected exception while waiting for streaming binary to complete", ie);
+ killProcess(process);
+ }
+
+ // Wait for stdout thread to complete
+ try {
+ if (stdoutThread != null) {
+ stdoutThread.join(0);
+ }
+ stdoutThread = null;
+ } catch (InterruptedException ie) {
+ LOG.error("Unexpected exception while waiting for output thread for streaming binary to complete", ie);
+ killProcess(process);
+ }
+
+ // Wait for stderr thread to complete
+ try {
+ if (stderrThread != null) {
+ stderrThread.join(0);
+ }
+ stderrThread = null;
+ } catch (InterruptedException ie) {
+ LOG.error("Unexpected exception while waiting for input thread for streaming binary to complete", ie);
+ killProcess(process);
+ }
+
+ LOG.debug("Process exited with: " + exitCode);
+ if (exitCode != SUCCESS) {
+ LOG.error(command + " failed with exit status: "
+ + exitCode);
+ }
+
+ if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
+
+ // Trigger the outputHandler
+ outputHandler.bindTo("", null, 0, -1);
+
+ // start thread to process output from executable's stdout
+ stdoutThread = new ProcessOutputThread(outputHandler, poStream);
+ stdoutThread.start();
+ }
+
+ // Check if there was a problem with the managed process
+ if (outerrThreadsError != null) {
+ LOG.error("Output/Error thread failed with: "
+ + outerrThreadsError);
+ }
+
+ }
+
+ /**
+ * Helper function to close input and output streams
+ * to the process and kill it
+ * @param process the process to be killed
+ * @throws IOException
+ */
+ private void killProcess(Process process) throws IOException {
+ inputHandler.close(process);
+ outputHandler.close();
+ process.destroy();
+ }
+
+ /**
+ * Convert path from Windows convention to Unix convention. Invoked under
+ * cygwin.
+ *
+ * @param path
+ * path in Windows convention
+ * @return path in Unix convention, null if fail
+ */
+ private String parseCygPath(String path) {
+ String[] command = new String[] { "cygpath", "-u", path };
+ Process p = null;
+ try {
+ p = Runtime.getRuntime().exec(command);
+ } catch (IOException e) {
+ return null;
+ }
+ int exitVal = 0;
+ try {
+ exitVal = p.waitFor();
+ } catch (InterruptedException e) {
+ return null;
+ }
+ if (exitVal != 0)
+ return null;
+ String line = null;
+ try {
+ InputStreamReader isr = new InputStreamReader(p.getInputStream());
+ BufferedReader br = new BufferedReader(isr);
+ line = br.readLine();
+ } catch (IOException e) {
+ return null;
+ }
+ return line;
+ }
+
+ /**
+ * Set up the run-time environment of the managed process.
+ *
+ * @param pb
+ * {@link ProcessBuilder} used to exec the process
+ */
+ protected void setupEnvironment(ProcessBuilder pb) {
+ String separator = ":";
+ Map<String, String> env = pb.environment();
+
+ // Add the current-working-directory to the $PATH
+ File dir = pb.directory();
+ String cwd = (dir != null) ? dir.getAbsolutePath() : System
+ .getProperty("user.dir");
+
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS")) {
+ String unixCwd = parseCygPath(cwd);
+ if (unixCwd == null)
+ throw new RuntimeException(
+ "Can not convert Windows path to Unix path under cygwin");
+ cwd = unixCwd;
+ }
+
+ String envPath = env.get(PATH);
+ if (envPath == null) {
+ envPath = cwd;
+ } else {
+ envPath = envPath + separator + cwd;
+ }
+ env.put(PATH, envPath);
+ }
+
+ /**
+ * Start execution of the external process.
+ *
+ * This takes care of setting up the environment of the process and also
+ * starts {@link ProcessErrorThread} to process the <code>stderr</code> of
+ * the managed process.
+ *
+ * @throws IOException
+ */
+ protected void exec() throws IOException {
+ // Set the actual command to run with 'bash -c exec ...'
+ List<String> cmdArgs = new ArrayList<String>();
+ cmdArgs.add(BASH);
+ cmdArgs.add("-c");
+ StringBuffer sb = new StringBuffer();
+ sb.append("exec ");
+ sb.append(argvAsString);
+ cmdArgs.add(sb.toString());
+
+ // Start the external process
+ ProcessBuilder processBuilder = new ProcessBuilder(cmdArgs
+ .toArray(new String[cmdArgs.size()]));
+ setupEnvironment(processBuilder);
+ process = processBuilder.start();
+ LOG.debug("Started the process for command: " + command);
+
+ // Pick up the process' stderr stream and start the thread to
+ // process the stderr stream
+ stderr = new DataInputStream(new BufferedInputStream(process
+ .getErrorStream()));
+ stderrThread = new ProcessErrorThread();
+ stderrThread.start();
+
+ // Check if we need to handle the process' stdout directly
+ if (outputHandler.getOutputType() == OutputType.SYNCHRONOUS) {
+ // Get hold of the stdout of the process
+ stdout = new DataInputStream(new BufferedInputStream(process
+ .getInputStream()));
+
+ // Bind the stdout to the OutputHandler
+ outputHandler.bindTo("", new BufferedPositionedInputStream(stdout),
+ 0, Long.MAX_VALUE);
+
+ // start thread to process output from executable's stdout
+ stdoutThread = new ProcessOutputThread(outputHandler, poStream);
+ stdoutThread.start();
+ }
+ }
+
+ /**
+ * Start execution of the {@link ExecutableManager}.
+ *
+ * @throws IOException
+ */
+ public void run() throws IOException {
+ // Check if we need to exec the process NOW ...
+ if (inputHandler.getInputType() == InputType.ASYNCHRONOUS) {
+ // start the thread to handle input
+ fileInputThread = new ProcessInputThread(inputHandler, poStream);
+ fileInputThread.start();
+
+ // If Input type is ASYNCHRONOUS that means input to the
+ // streaming binary is from a file - that means we cannot exec
+ // the process till the input file is completely written. This
+ // will be done in close() - so now we return
+ return;
+ }
+
+ // Start the executable ...
+ exec();
+ // set up input to the executable
+ stdin = new DataOutputStream(new BufferedOutputStream(process
+ .getOutputStream()));
+ inputHandler.bindTo(stdin);
+
+ // Start the thread to send input to the executable's stdin
+ stdinThread = new ProcessInputThread(inputHandler, poStream);
+ stdinThread.start();
+ }
+
+ /**
+ * The thread which consumes input from POStream's binaryInput queue
+ * and feeds it to the the Process
+ */
+ class ProcessInputThread extends Thread {
+
+ InputHandler inputHandler;
+ private POStream poStream;
+ private BlockingQueue<Result> binaryInputQueue;
+
+ ProcessInputThread(InputHandler inputHandler, POStream poStream) {
+ setDaemon(true);
+ this.inputHandler = inputHandler;
+ this.poStream = poStream;
+ // the input queue from where this thread will read
+ // input tuples
+ this.binaryInputQueue = poStream.getBinaryInputQueue();
+ }
+
+ public void run() {
+ try {
+ // Read tuples from the previous operator in the pipeline
+ // and pass it to the executable
+ while (true) {
+ Result inp = null;
+ inp = binaryInputQueue.take();
+ synchronized (poStream) {
+ // notify waiting producer
+ // the if check is to keep "findbugs"
+ // happy
+ if(inp != null)
+ poStream.notifyAll();
+ }
+ // We should receive an EOP only when *ALL* input
+ // for this process has already been sent and no
+ // more input is expected
+ if (inp.returnStatus == POStatus.STATUS_EOP) {
+ // signal cleanup in ExecutableManager
+ close();
+ return;
+ }
+ if (inp.returnStatus == POStatus.STATUS_OK) {
+ // Check if there was a problem with the managed process
+ if (outerrThreadsError != null) {
+ throw new IOException(
+ "Output/Error thread failed with: "
+ + outerrThreadsError);
+ }
+
+ // Pass the serialized tuple to the executable via the
+ // InputHandler
+ Tuple t = null;
+ try {
+ t = (Tuple) inp.result;
+ inputHandler.putNext(t);
+ } catch (IOException e) {
+ // if input type is synchronous then it could
+ // be related to the process terminating
+ if(inputHandler.getInputType() == InputType.SYNCHRONOUS) {
+ LOG.warn("Exception while trying to write to stream binary's input", e);
+ // could be because the process
+ // died OR closed the input stream
+ // we will only call close() here and not
+ // worry about deducing whether the process died
+ // normally or abnormally - if there was any real
+ // issue the ProcessOutputThread should see
+ // a non zero exit code from the process and send
+ // a POStatus.STATUS_ERR back - what if we got
+ // an IOException because there was only an issue with
+ // writing to input of the binary - hmm..hope that means
+ // the process died abnormally!!
+ close();
+ return;
+ } else {
+ // asynchronous case - then this is a real exception
+ LOG.error("Exception while trying to write to stream binary's input", e);
+ // send POStatus.STATUS_ERR to POStream to signal the error
+ // Generally the ProcessOutputThread would do this but now
+ // we should do it here since neither the process nor the
+ // ProcessOutputThread will ever be spawned
+ Result res = new Result(POStatus.STATUS_ERR,
+ "Exception while trying to write to stream binary's input" + e.getMessage());
+ sendOutput(poStream.getBinaryOutputQueue(), res);
+ throw e;
+ }
+ }
+ inputBytes += t.getMemorySize();
+ inputRecords++;
+ }
+ }
+ } catch (Throwable t) {
+
+
+ // Note that an error occurred
+ outerrThreadsError = t;
+ LOG.error(t);
+ try {
+ killProcess(process);
+ } catch (IOException ioe) {
+ LOG.warn(ioe);
+ }
+ }
+ }
+ }
+
+ private void sendOutput(BlockingQueue<Result> binaryOutputQueue, Result res) {
+ try {
+ binaryOutputQueue.put(res);
+ } catch (InterruptedException e) {
+ LOG.error("Error while sending binary output to POStream", e);
+ }
+ synchronized (poStream) {
+ // notify waiting consumer
+ // the if is to satisfy "findbugs"
+ if(res != null) {
+ poStream.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * The thread which gets output from the streaming binary and puts it onto
+ * the binary output Queue of POStream
+ */
+ class ProcessOutputThread extends Thread {
+
+ OutputHandler outputHandler;
+ private BlockingQueue<Result> binaryOutputQueue;
+
+ ProcessOutputThread(OutputHandler outputHandler, POStream poStream) {
+ setDaemon(true);
+ this.outputHandler = outputHandler;
+ // the output queue where this thread will put
+ // output tuples for POStream
+ this.binaryOutputQueue = poStream.getBinaryOutputQueue();
+ }
+
+ public void run() {
+ try {
+ // Read tuples from the executable and send it to
+ // Queue of POStream
+ Tuple tuple = null;
+ while ((tuple = outputHandler.getNext()) != null) {
+ processOutput(tuple);
+ outputBytes += tuple.getMemorySize();
+ }
+ // output from binary is done
+ processOutput(null);
+ outputHandler.close();
+ } catch (Throwable t) {
+ // Note that an error occurred
+ outerrThreadsError = t;
+ LOG.error("Caught Exception in OutputHandler of Streaming binary, " +
+ "sending error signal to pipeline", t);
+ // send ERROR to POStream
+ try {
+ Result res = new Result();
+ res.result = "Error reading output from Streaming binary:" +
+ "'" + argvAsString + "':" + t.getMessage();
+ res.returnStatus = POStatus.STATUS_ERR;
+ sendOutput(binaryOutputQueue, res);
+ killProcess(process);
+ } catch (Exception e) {
+ LOG.error("Error while trying to signal Error status to pipeline", e);
+ }
+ }
+ }
+
+ void processOutput(Tuple t) {
+ Result res = new Result();
+
+ if (t != null) {
+ // we have a valid tuple to pass back
+ res.result = t;
+ res.returnStatus = POStatus.STATUS_OK;
+ outputRecords++;
+ } else {
+ // t == null means end of output from
+ // binary - wait for the process to exit
+ // and harvest exit code
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException ie) {
+ try {
+ killProcess(process);
+ } catch (IOException e) {
+ LOG.warn("Exception trying to kill process while processing null output " +
+ "from binary", e);
+
+ }
+ // signal error
+ String errMsg = "Failure while waiting for process (" + argvAsString + ")" +
+ ie.getMessage();
+ LOG.error(errMsg, ie);
+ res.result = errMsg;
+ res.returnStatus = POStatus.STATUS_ERR;
+ sendOutput(binaryOutputQueue, res);
+ return;
+ }
+ if(exitCode == 0) {
+ // signal EOS (End Of Stream output)
+ res = EOS_RESULT;
+ } else {
+ // signal Error
+
+ String errMsg = "'" + argvAsString + "'" + " failed with exit status: "
+ + exitCode;
+ LOG.error(errMsg);
+ res.result = errMsg;
+ res.returnStatus = POStatus.STATUS_ERR;
+ }
+ }
+ sendOutput(binaryOutputQueue, res);
+
+ }
+ }
+
+
+
+ /**
+ * Workhorse to process the stderr stream of the managed process.
+ *
+ * By default <code>ExecuatbleManager</code> just sends out the received
+ * error message to the <code>stderr</code> of itself.
+ *
+ * @param error
+ * error message from the managed process.
+ */
+ protected void processError(String error) {
+ // Just send it out to our stderr
+ System.err.print(error);
+ }
+
+ class ProcessErrorThread extends Thread {
+
+ public ProcessErrorThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ String error;
+ BufferedReader reader = new BufferedReader(
+ new InputStreamReader(stderr));
+ while ((error = reader.readLine()) != null) {
+ processError(error + "\n");
+ }
+
+ if (stderr != null) {
+ stderr.close();
+ LOG.debug("ProcessErrorThread done");
+ }
+ } catch (Throwable t) {
+ // Note that an error occurred
+ outerrThreadsError = t;
+
+ LOG.error(t);
+ try {
+ if (stderr != null) {
+ stderr.close();
+ }
+ } catch (IOException ioe) {
+ LOG.warn(ioe);
+ }
+ throw new RuntimeException(t);
+ }
+ }
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileInputHandler.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileInputHandler} handles the input for the Pig-Streaming
+ * executable in an {@link InputType#ASYNCHRONOUS} manner by feeding it input
+ * via an external file specified by the user.
+ */
+public class FileInputHandler extends InputHandler {
+
+ String fileName;
+ OutputStream fileOutStream;
+
+ public FileInputHandler(HandleSpec handleSpec) throws ExecException {
+ fileName = handleSpec.name;
+ serializer =
+ (StoreFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+
+ try {
+ fileOutStream = new FileOutputStream(new File(fileName));
+ super.bindTo(fileOutStream);
+ } catch (IOException fnfe) {
+ throw new ExecException(fnfe);
+ }
+ }
+
+ public InputType getInputType() {
+ return InputType.ASYNCHRONOUS;
+ }
+
+ public void bindTo(OutputStream os) throws IOException {
+ throw new UnsupportedOperationException("Cannot call bindTo on " +
+ "FileInputHandler");
+ }
+
+ public synchronized void close(Process process) throws IOException {
+ if(!alreadyClosed) {
+ super.close(process);
+ fileOutStream.flush();
+ fileOutStream.close();
+ fileOutStream = null;
+ alreadyClosed = true;
+ }
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/FileOutputHandler.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * {@link FileOutputHandler} handles the output from the Pig-Streaming
+ * executable in an {@link OutputType#ASYNCHRONOUS} manner by reading it from
+ * an external file specified by the user.
+ */
+public class FileOutputHandler extends OutputHandler {
+
+ String fileName;
+ BufferedPositionedInputStream fileInStream;
+
+ public FileOutputHandler(HandleSpec handleSpec) throws ExecException {
+ fileName = handleSpec.name;
+ deserializer =
+ (LoadFunc) PigContext.instantiateFuncFromSpec(handleSpec.spec);
+ }
+
+ public OutputType getOutputType() {
+ return OutputType.ASYNCHRONOUS;
+ }
+
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ // This is a trigger to start processing the output from the file ...
+ // ... however, we must ignore the input parameters and use ones
+ // provided during initialization
+ File file = new File(this.fileName);
+ this.fileInStream =
+ new BufferedPositionedInputStream(new FileInputStream(file));
+ super.bindTo(this.fileName, this.fileInStream, 0, file.length());
+ }
+
+ public synchronized void close() throws IOException {
+ if(!alreadyClosed) {
+ super.close();
+ fileInStream.close();
+ fileInStream = null;
+ alreadyClosed = true;
+ }
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/HandlerFactory.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+
+/**
+ * Factory to create an {@link InputHandler} or {@link OutputHandler}
+ * depending on the specification of the {@link StreamingCommand}.
+ */
+public class HandlerFactory {
+
+ /**
+ * Create an <code>InputHandler</code> for the given input specification
+ * of the <code>StreamingCommand</code>.
+ *
+ * @param command <code>StreamingCommand</code>
+ * @return <code>InputHandler</code> for the given input specification
+ * @throws ExecException
+ */
+ public static InputHandler createInputHandler(StreamingCommand command)
+ throws ExecException {
+ List<HandleSpec> inputSpecs = command.getHandleSpecs(Handle.INPUT);
+
+ HandleSpec in = null;
+ if (inputSpecs == null || (in = inputSpecs.get(0)) == null) {
+ return new DefaultInputHandler();
+ }
+
+ return (in.name.equals("stdin")) ? new DefaultInputHandler(in) :
+ new FileInputHandler(in);
+ }
+
+ /**
+ * Create an <code>OutputHandler</code> for the given output specification
+ * of the <code>StreamingCommand</code>.
+ *
+ * @param command <code>StreamingCommand</code>
+ * @return <code>OutputHandler</code> for the given output specification
+ * @throws ExecException
+ */
+ public static OutputHandler createOutputHandler(StreamingCommand command)
+ throws ExecException {
+ List<HandleSpec> outputSpecs = command.getHandleSpecs(Handle.OUTPUT);
+
+ HandleSpec out = null;
+ if (outputSpecs == null || (out = outputSpecs.get(0)) == null) {
+ return new DefaultOutputHandler();
+ }
+
+ return (out.name.equals("stdout")) ? new DefaultOutputHandler(out) :
+ new FileOutputHandler(out);
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/InputHandler.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.apache.pig.StoreFunc;
+import org.apache.pig.data.Tuple;
+
+/**
+ * {@link InputHandler} is responsible for handling the input to the
+ * Pig-Streaming external command.
+ *
+ * The managed executable could be fed input in a {@link InputType#SYNCHRONOUS}
+ * manner via its <code>stdin</code> or in an {@link InputType#ASYNCHRONOUS}
+ * manner via an external file which is subsequently read by the executable.
+ */
+public abstract class InputHandler {
+ /**
+ *
+ */
+ public enum InputType {SYNCHRONOUS, ASYNCHRONOUS}
+ /*
+ * The serializer to be used to send data to the managed process.
+ *
+ * It is the responsibility of the concrete sub-classes to setup and
+ * manage the serializer.
+ */
+ protected StoreFunc serializer;
+
+ // flag to mark if close() has already been called
+ protected boolean alreadyClosed = false;
+
+ /**
+ * Get the handled <code>InputType</code>
+ * @return the handled <code>InputType</code>
+ */
+ public abstract InputType getInputType();
+
+ /**
+ * Send the given input <code>Tuple</code> to the managed executable.
+ *
+ * @param t input <code>Tuple</code>
+ * @throws IOException
+ */
+ public void putNext(Tuple t) throws IOException {
+ serializer.putNext(t);
+ }
+
+ /**
+ * Close the <code>InputHandler</code> since there is no more input
+ * to be sent to the managed process.
+ * @param process the managed process - this could be null in some cases
+ * like when input is through files. In that case, the process would not
+ * have been exec'ed yet - if this method if overridden it is the responsibility
+ * of the implementer to check that the process is usable. The managed process
+ * object is supplied by the ExecutableManager to this call so that this method
+ * can check if the process is alive if it needs to know.
+ *
+ * @throws IOException
+ */
+ public synchronized void close(Process process) throws IOException {
+ if(!alreadyClosed) {
+ serializer.finish();
+ alreadyClosed = true;
+ }
+ }
+
+ /**
+ * Bind the <code>InputHandler</code> to the <code>OutputStream</code>
+ * from which it reads input and sends it to the managed process.
+ *
+ * @param os <code>OutputStream</code> from which to read input data for the
+ * managed process
+ * @throws IOException
+ */
+ public void bindTo(OutputStream os) throws IOException {
+ serializer.bindTo(os);
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/OutputHandler.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.streaming;
+
+import java.io.IOException;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+
+/**
+ * {@link OutputHandler} is responsible for handling the output of the
+ * Pig-Streaming external command.
+ *
+ * The output of the managed executable could be fetched in a
+ * {@link OutputType#SYNCHRONOUS} manner via its <code>stdout</code> or in an
+ * {@link OutputType#ASYNCHRONOUS} manner via an external file to which the
+ * process wrote its output.
+ */
+public abstract class OutputHandler {
+ public enum OutputType {SYNCHRONOUS, ASYNCHRONOUS}
+
+ /*
+ * The deserializer to be used to send data to the managed process.
+ *
+ * It is the responsibility of the concrete sub-classes to setup and
+ * manage the deserializer.
+ */
+ protected LoadFunc deserializer;
+
+ /**
+ * Get the handled <code>OutputType</code>.
+ * @return the handled <code>OutputType</code>
+ */
+ public abstract OutputType getOutputType();
+
+ // flag to mark if close() has already been called
+ protected boolean alreadyClosed = false;
+
+ /**
+ * Bind the <code>OutputHandler</code> to the <code>InputStream</code>
+ * from which to read the output data of the managed process.
+ *
+ * @param is <code>InputStream</code> from which to read the output data
+ * of the managed process
+ * @throws IOException
+ */
+ public void bindTo(String fileName, BufferedPositionedInputStream is,
+ long offset, long end) throws IOException {
+ deserializer.bindTo(fileName, new BufferedPositionedInputStream(is),
+ offset, end);
+ }
+
+ /**
+ * Get the next output <code>Tuple</code> of the managed process.
+ *
+ * @return the next output <code>Tuple</code> of the managed process
+ * @throws IOException
+ */
+ public Tuple getNext() throws IOException {
+ return deserializer.getNext();
+ }
+
+ /**
+ * Close the <code>OutputHandler</code>.
+ * @throws IOException
+ */
+ public synchronized void close() throws IOException {}
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=692253&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu Sep 4 14:25:41 2008
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.streaming;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+
+
+/**
+ * {@link StreamingCommand} represents the specification of an external
+ * command to be executed in a Pig Query.
+ *
+ * <code>StreamingCommand</code> encapsulates all relevant details of the
+ * command specified by the user either directly via the <code>STREAM</code>
+ * operator or indirectly via a <code>DEFINE</code> operator. It includes
+ * details such as input/output/error specifications and also files to be
+ * shipped to the cluster and files to be cached.
+ */
+public class StreamingCommand implements Serializable, Cloneable {
+ private static final long serialVersionUID = 1L;
+
+ // External command to be executed and it's parsed components
+ String executable;
+ String[] argv;
+
+ // Files to be shipped to the cluster in-order to be executed
+ List<String> shipSpec = new LinkedList<String>();
+
+ // Files to be shipped to the cluster in-order to be executed
+ List<String> cacheSpec = new LinkedList<String>();
+
+ /**
+ * Handle to communicate with the external process.
+ */
+ public enum Handle {INPUT, OUTPUT}
+
+ /**
+ * Map from the the stdin/stdout/stderr handles to their specifications
+ */
+ Map<Handle, List<HandleSpec>> handleSpecs =
+ new TreeMap<Handle, List<HandleSpec>>();
+
+ // Should the stderr of the process be persisted?
+ boolean persistStderr = false;
+
+ // Directory where the process's stderr logs should be persisted.
+ String logDir;
+
+ // Limit on the number of persisted log-files
+ int logFilesLimit = 100;
+ public static final int MAX_TASKS = 100;
+
+ boolean shipFiles = true;
+
+ private PigContext pigContext;
+
+ /**
+ * Create a new <code>StreamingCommand</code> with the given command.
+ *
+ * @param command streaming command to be executed
+ * @param argv parsed arguments of the <code>command</code>
+ */
+ public StreamingCommand(PigContext pigContext, String[] argv) {
+ this.pigContext = pigContext;
+ this.argv = argv;
+
+ // Assume that argv[0] is the executable
+ this.executable = this.argv[0];
+ }
+
+ /**
+ * Get the command to be executed.
+ *
+ * @return the command to be executed
+ */
+ public String getExecutable() {
+ return executable;
+ }
+
+ /**
+ * Set the executable for the <code>StreamingCommand</code>.
+ *
+ * @param executable the executable for the <code>StreamingCommand</code>
+ */
+ public void setExecutable(String executable) {
+ this.executable = executable;
+ }
+
+ /**
+ * Set the command line arguments for the <code>StreamingCommand</code>.
+ *
+ * @param argv the command line arguments for the
+ * <code>StreamingCommand</code>
+ */
+ public void setCommandArgs(String[] argv) {
+ this.argv = argv;
+ }
+
+ /**
+ * Get the parsed command arguments.
+ *
+ * @return the parsed command arguments as <code>String[]</code>
+ */
+ public String[] getCommandArgs() {
+ return argv;
+ }
+
+ /**
+ * Get the list of files which need to be shipped to the cluster.
+ *
+ * @return the list of files which need to be shipped to the cluster
+ */
+ public List<String> getShipSpecs() {
+ return shipSpec;
+ }
+
+ /**
+ * Get the list of files which need to be cached on the execute nodes.
+ *
+ * @return the list of files which need to be cached on the execute nodes
+ */
+ public List<String> getCacheSpecs() {
+ return cacheSpec;
+ }
+
+ /**
+ * Add a file to be shipped to the cluster.
+ *
+ * Users can use this to distribute executables and other necessary files
+ * to the clusters.
+ *
+ * @param path path of the file to be shipped to the cluster
+ */
+ public void addPathToShip(String path) throws IOException {
+ // Validate
+ File file = new File(path);
+ if (!file.exists()) {
+ throw new IOException("Invalid ship specification: '" + path +
+ "' does not exist!");
+ } else if (file.isDirectory()) {
+ throw new IOException("Invalid ship specification: '" + path +
+ "' is a directory and can't be shipped!");
+ }
+ shipSpec.add(path);
+ }
+
+ /**
+ * Add a file to be cached on execute nodes on the cluster. The file is
+ * assumed to be available at the shared filesystem.
+ *
+ * @param path path of the file to be cached on the execute nodes
+ */
+ public void addPathToCache(String path) throws IOException {
+ // Validate
+ URI pathUri = null;
+ URI dfsPath = null;
+ try {
+ pathUri = new URI(path);
+
+ // Strip away the URI's _fragment_ and _query_
+ dfsPath = new URI(pathUri.getScheme(), pathUri.getAuthority(),
+ pathUri.getPath(), null, null);
+ } catch (URISyntaxException urise) {
+ throw new IOException("Invalid cache specification: " + path);
+ }
+
+ boolean exists = false;
+ try {
+ exists = FileLocalizer.fileExists(dfsPath.toString(), pigContext);
+ } catch (IOException ioe) {
+ // Throw a better error message...
+ throw new IOException("Invalid cache specification: '" + dfsPath +
+ "' does not exist!");
+ }
+
+ if (!exists) {
+ throw new IOException("Invalid cache specification: '" + dfsPath +
+ "' does not exist!");
+ } else if (FileLocalizer.isDirectory(dfsPath.toString(), pigContext)) {
+ throw new IOException("Invalid cache specification: '" + dfsPath +
+ "' is a directory and can't be cached!");
+ }
+
+ cacheSpec.add(path);
+ }
+
+ /**
+ * Attach a {@link HandleSpec} to a given {@link Handle}
+ * @param handle <code>Handle</code> to which the specification is to
+ * be attached.
+ * @param handleSpec <code>HandleSpec</code> for the given handle.
+ */
+ public void addHandleSpec(Handle handle, HandleSpec handleSpec) {
+ List<HandleSpec> handleSpecList = handleSpecs.get(handle);
+
+ if (handleSpecList == null) {
+ handleSpecList = new LinkedList<HandleSpec>();
+ handleSpecs.put(handle, handleSpecList);
+ }
+
+ handleSpecList.add(handleSpec);
+ }
+
+ /**
+ * Set the input specification for the <code>StreamingCommand</code>.
+ *
+ * @param spec input specification
+ */
+ public void setInputSpec(HandleSpec spec) {
+ List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT);
+ if (inputSpecs == null || inputSpecs.size() == 0) {
+ addHandleSpec(Handle.INPUT, spec);
+ } else {
+ inputSpecs.set(0, spec);
+ }
+ }
+
+ /**
+ * Get the input specification of the <code>StreamingCommand</code>.
+ *
+ * @return input specification of the <code>StreamingCommand</code>
+ */
+ public HandleSpec getInputSpec() {
+ List<HandleSpec> inputSpecs = getHandleSpecs(Handle.INPUT);
+ if (inputSpecs == null || inputSpecs.size() == 0) {
+ addHandleSpec(Handle.INPUT, new HandleSpec("stdin", PigStorage.class.getName()));
+ }
+ return getHandleSpecs(Handle.INPUT).get(0);
+ }
+
+ /**
+ * Set the specification for the primary output of the
+ * <code>StreamingCommand</code>.
+ *
+ * @param spec specification for the primary output of the
+ * <code>StreamingCommand</code>
+ */
+ public void setOutputSpec(HandleSpec spec) {
+ List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT);
+ if (outputSpecs == null || outputSpecs.size() == 0) {
+ addHandleSpec(Handle.OUTPUT, spec);
+ } else {
+ outputSpecs.set(0, spec);
+ }
+ }
+
+ /**
+ * Get the specification of the primary output of the
+ * <code>StreamingCommand</code>.
+ *
+ * @return specification of the primary output of the
+ * <code>StreamingCommand</code>
+ */
+ public HandleSpec getOutputSpec() {
+ List<HandleSpec> outputSpecs = getHandleSpecs(Handle.OUTPUT);
+ if (outputSpecs == null || outputSpecs.size() == 0) {
+ addHandleSpec(Handle.OUTPUT, new HandleSpec("stdout", PigStorage.class.getName()));
+ }
+ return getHandleSpecs(Handle.OUTPUT).get(0);
+ }
+
+ /**
+ * Get specifications for the given <code>Handle</code>.
+ *
+ * @param handle <code>Handle</code> of the stream
+ * @return specification for the given <code>Handle</code>
+ */
+ public List<HandleSpec> getHandleSpecs(Handle handle) {
+ return handleSpecs.get(handle);
+ }
+
+ /**
+ * Should the stderr of the managed process be persisted?
+ *
+ * @return <code>true</code> if the stderr of the managed process should be
+ * persisted, <code>false</code> otherwise.
+ */
+ public boolean getPersistStderr() {
+ return persistStderr;
+ }
+
+ /**
+ * Specify if the stderr of the managed process should be persisted.
+ *
+ * @param persistStderr <code>true</code> if the stderr of the managed
+ * process should be persisted, else <code>false</code>
+ */
+ public void setPersistStderr(boolean persistStderr) {
+ this.persistStderr = persistStderr;
+ }
+
+ /**
+ * Get the directory where the log-files of the command are persisted.
+ *
+ * @return the directory where the log-files of the command are persisted
+ */
+ public String getLogDir() {
+ return logDir;
+ }
+
+ /**
+ * Set the directory where the log-files of the command are persisted.
+ *
+ * @param logDir the directory where the log-files of the command are persisted
+ */
+ public void setLogDir(String logDir) {
+ this.logDir = logDir;
+ if (this.logDir.startsWith("/")) {
+ this.logDir = this.logDir.substring(1);
+ }
+ setPersistStderr(true);
+ }
+
+ /**
+ * Get the maximum number of tasks whose stderr logs files are persisted.
+ *
+ * @return the maximum number of tasks whose stderr logs files are persisted
+ */
+ public int getLogFilesLimit() {
+ return logFilesLimit;
+ }
+
+ /**
+ * Set the maximum number of tasks whose stderr logs files are persisted.
+ * @param logFilesLimit the maximum number of tasks whose stderr logs files
+ * are persisted
+ */
+ public void setLogFilesLimit(int logFilesLimit) {
+ this.logFilesLimit = Math.min(MAX_TASKS, logFilesLimit);
+ }
+
+ /**
+ * Set whether files should be shipped or not.
+ *
+ * @param shipFiles <code>true</code> if files of this command should be
+ * shipped, <code>false</code> otherwise
+ */
+ public void setShipFiles(boolean shipFiles) {
+ this.shipFiles = shipFiles;
+ }
+
+ /**
+ * Get whether files for this command should be shipped or not.
+ *
+ * @return <code>true</code> if files of this command should be shipped,
+ * <code>false</code> otherwise
+ */
+ public boolean getShipFiles() {
+ return shipFiles;
+ }
+
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+ for (String arg : getCommandArgs()) {
+ sb.append(arg);
+ sb.append(" ");
+ }
+ sb.append("(" + getInputSpec().toString() + "/"+getOutputSpec() + ")");
+
+ return sb.toString();
+ }
+
+ public Object clone() {
+ try {
+ StreamingCommand clone = (StreamingCommand)super.clone();
+
+ clone.shipSpec = new ArrayList<String>(shipSpec);
+ clone.cacheSpec = new ArrayList<String>(cacheSpec);
+
+ clone.handleSpecs = new HashMap<Handle, List<HandleSpec>>();
+ for (Map.Entry<Handle, List<HandleSpec>> e : handleSpecs.entrySet()) {
+ List<HandleSpec> values = new ArrayList<HandleSpec>();
+ for (HandleSpec spec : e.getValue()) {
+ values.add((HandleSpec)spec.clone());
+ }
+ clone.handleSpecs.put(e.getKey(), values);
+ }
+
+ return clone;
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
+
+
+ /**
+ * Specification about the usage of the {@link Handle} to communicate
+ * with the external process.
+ *
+ * It specifies the stream-handle which can be one of <code>stdin</code>/
+ * <code>stdout</code>/<code>stderr</code> or a named file and also the
+ * serializer/deserializer specification to be used to read/write data
+ * to/from the stream.
+ */
+ public static class HandleSpec
+ implements Comparable<HandleSpec>, Serializable, Cloneable {
+ private static final long serialVersionUID = 1L;
+
+ String name;
+ String spec;
+
+ /**
+ * Create a new {@link HandleSpec} with a given name using the default
+ * {@link PigStorage} serializer/deserializer.
+ *
+ * @param handleName name of the handle (one of <code>stdin</code>,
+ * <code>stdout</code> or a file-path)
+ */
+ public HandleSpec(String handleName) {
+ this(handleName, PigStorage.class.getName());
+ }
+
+ /**
+ * Create a new {@link HandleSpec} with a given name using the default
+ * {@link PigStorage} serializer/deserializer.
+ *
+ * @param handleName name of the handle (one of <code>stdin</code>,
+ * <code>stdout</code> or a file-path)
+ * @param spec serializer/deserializer spec
+ */
+ public HandleSpec(String handleName, String spec) {
+ this.name = handleName;
+ this.spec = spec;
+ }
+
+ public int compareTo(HandleSpec o) {
+ return this.name.compareTo(o.name);
+ }
+
+ public String toString() {
+ return name + "-" + spec;
+ }
+
+ /**
+ * Get the <b>name</b> of the <code>HandleSpec</code>.
+ *
+ * @return the <b>name</b> of the <code>HandleSpec</code> (one of
+ * <code>stdin</code>, <code>stdout</code> or a file-path)
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set the <b>name</b> of the <code>HandleSpec</code>.
+ *
+ * @param name <b>name</b> of the <code>HandleSpec</code> (one of
+ * <code>stdin</code>, <code>stdout</code> or a file-path)
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Get the serializer/deserializer spec of the <code>HandleSpec</code>.
+ *
+ * @return the serializer/deserializer spec of the
+ * <code>HandleSpec</code>
+ */
+ public String getSpec() {
+ return spec;
+ }
+
+ /**
+ * Set the serializer/deserializer spec of the <code>HandleSpec</code>.
+ *
+ * @param spec the serializer/deserializer spec of the
+ * <code>HandleSpec</code>
+ */
+ public void setSpec(String spec) {
+ this.spec = spec;
+ }
+
+ public boolean equals(Object obj) {
+ HandleSpec other = (HandleSpec)obj;
+ return (name.equals(other.name) && spec.equals(other.spec));
+ }
+
+
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=692253&r1=692252&r2=692253&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Thu Sep 4 14:25:41 2008
@@ -17,6 +17,7 @@
*/
package org.apache.pig.tools.grunt;
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
@@ -172,6 +173,15 @@
//mPigServer.setJobName(unquote(value));
mPigServer.setJobName(value);
}
+ else if (key.equals("stream.skippath")) {
+ // Validate
+ File file = new File(value);
+ if (!file.exists() || file.isDirectory()) {
+ throw new IOException("Invalid value for stream.skippath:" +
+ value);
+ }
+ mPigServer.addPathToSkip(value);
+ }
else
{
// other key-value pairs can go there