You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/05/02 22:06:31 UTC
svn commit: r652887 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
test/org/apache/pig/test/TestStreaming.java
Author: olga
Date: Fri May 2 13:06:31 2008
New Revision: 652887
URL: http://svn.apache.org/viewvc?rev=652887&view=rev
Log:
PIG-228: make multiple streaming outputs adhere to spec
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652887&r1=652886&r2=652887&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri May 2 13:06:31 2008
@@ -260,3 +260,5 @@
PIG-226: fix for streaming optimization bug (acmurthy via olgan)
+ PIG-228: make multiple streaming outputs adhere to spec (acmurthy via olgan)
+
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=652887&r1=652886&r2=652887&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Fri May 2 13:06:31 2008
@@ -19,6 +19,7 @@
import java.io.File;
import java.io.IOException;
+import java.text.NumberFormat;
import java.util.Date;
import java.util.List;
import java.util.Properties;
@@ -45,6 +46,16 @@
* of the managed process and also persists the logs of the tasks on HDFS.
*/
public class HadoopExecutableManager extends ExecutableManager {
+ // The part-<partition> file name, similar to Hadoop's outputs
+ private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+ static {
+ NUMBER_FORMAT.setMinimumIntegerDigits(5);
+ NUMBER_FORMAT.setGroupingUsed(false);
+ }
+
+ static String getOutputName(int partition) {
+ return "part-" + NUMBER_FORMAT.format(partition);
+ }
JobConf job;
@@ -122,10 +133,13 @@
for (int i=1; i < outputSpecs.size(); ++i) {
String fileName = outputSpecs.get(i).getName();
try {
+ int partition = job.getInt("mapred.task.partition", -1);
fs.copyFromLocalFile(false, true, new Path(fileName),
- new Path(scriptOutputDir,
- taskId+"-"+fileName)
- );
+ new Path(
+ new Path(scriptOutputDir,
+ fileName),
+ getOutputName(partition))
+ );
} catch (IOException ioe) {
System.err.println("Failed to save secondary output '" +
fileName + "' of task: " + taskId +
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=652887&r1=652886&r2=652887&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Fri May 2 13:06:31 2008
@@ -324,9 +324,9 @@
"open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
"open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
"while (<STDIN>) {",
- " print OUTFILE \"A,10\n\";",
+ " print OUTFILE \"$_\n\";",
" print STDERR \"STDERR: $_\n\";",
- " print OUTFILE2 \"Secondary Output: $_\n\";",
+ " print OUTFILE2 \"A,10\n\";",
"}",
};
File command = Util.createInputFile("script", "pl", script);
@@ -354,7 +354,8 @@
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
- InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ InputStream op = FileLocalizer.open(output+"/bar",
+ pigServer.getPigContext());
PigStorage ps = new PigStorage(",");
ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
List<Tuple> outputs = new ArrayList<Tuple>();
@@ -388,7 +389,7 @@
" chomp $_;",
" print OUTFILE \"$_\n\";",
" print STDERR \"STDERR: $_\n\";",
- " print OUTFILE2 \"Secondary Output: $_\n\";",
+ " print OUTFILE2 \"$_\n\";",
"}",
};
File command = Util.createInputFile("script", "pl", script);
@@ -417,7 +418,8 @@
pigServer.deleteFile(output);
pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
- InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ InputStream op = FileLocalizer.open(output+"/foobar",
+ pigServer.getPigContext());
PigStorage ps = new PigStorage(",");
ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
List<Tuple> outputs = new ArrayList<Tuple>();