You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/06/26 01:18:22 UTC
svn commit: r671681 - in /incubator/pig/trunk/src/org/apache/pig:
backend/hadoop/streaming/ impl/eval/ impl/logicalLayer/optimizer/streaming/
impl/streaming/
Author: olga
Date: Wed Jun 25 16:18:22 2008
New Revision: 671681
URL: http://svn.apache.org/viewvc?rev=671681&view=rev
Log:
PIG-272: streaming with intermediate store is broken
Modified:
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Wed Jun 25 16:18:22 2008
@@ -194,12 +194,7 @@
private void writeDebugHeader() {
processError("===== Task Information Header =====" );
- StringBuffer sb = new StringBuffer();
- for (String arg : command.getCommandArgs()) {
- sb.append(arg);
- sb.append(" ");
- }
- processError("\nCommand: " + sb.toString());
+ processError("\nCommand: " + command);
processError("\nStart time: " + new Date(System.currentTimeMillis()));
if (job.getBoolean("mapred.task.is.map", false)) {
processError("\nInput-split file: " + job.get("map.input.file"));
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Wed Jun 25 16:18:22 2008
@@ -28,6 +28,8 @@
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
+import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
import org.apache.pig.data.Datum;
import org.apache.pig.impl.eval.collector.DataCollector;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -40,10 +42,13 @@
private String executableManager; // ExecutableManager to use
private StreamingCommand command; // Actual command to be run
+ private StreamingCommand originalCommand; // Original command
+ private StreamingCommand optimizedCommand; // Optimized command
public StreamSpec(ExecutableManager executableManager,
StreamingCommand command) {
this.executableManager = executableManager.getClass().getName();
+ this.originalCommand = command;
this.command = command;
// Setup streaming-specific properties
@@ -75,12 +80,54 @@
/**
* Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
- * @return
+ * @return the {@link StreamingCommand} for this <code>StreamSpec</code>
*/
public StreamingCommand getCommand() {
return command;
}
+ /**
+ * Set the optimized {@link HandleSpec} for the given {@link Handle} of the
+ * <code>StreamSpec</code>.
+ *
+ * @param handle <code>Handle</code> to optimize
+ * @param spec optimized specification for the handle
+ */
+ public void setOptimizedSpec(Handle handle, String spec) {
+ if (optimizedCommand == null) {
+ optimizedCommand = (StreamingCommand)command.clone();
+ }
+
+ if (handle == Handle.INPUT) {
+ HandleSpec streamInputSpec = optimizedCommand.getInputSpec();
+ streamInputSpec.setSpec(spec);
+ } else if (handle == Handle.OUTPUT) {
+ HandleSpec streamOutputSpec = optimizedCommand.getOutputSpec();
+ streamOutputSpec.setSpec(spec);
+ }
+
+ command = optimizedCommand;
+ }
+
+ /**
+ * Revert the optimized {@link StreamingCommand} for this
+ * <code>StreamSpec</code>.
+ */
+ public void revertOptimizedCommand(Handle handle) {
+ if (optimizedCommand == null) {
+ return;
+ }
+
+ if (handle == Handle.INPUT &&
+ !command.getInputSpec().equals(originalCommand.getInputSpec())) {
+ command.setInputSpec(originalCommand.getInputSpec());
+ } else if (handle == Handle.OUTPUT &&
+ !command.getOutputSpec().equals(
+ originalCommand.getOutputSpec())) {
+ command.setOutputSpec(originalCommand.getOutputSpec());
+ }
+ }
+
public List<String> getFuncs() {
// No user-defined functions here
return new ArrayList<String>();
@@ -101,6 +148,10 @@
v.visitStream(this);
}
+ public String toString() {
+ return command.toString();
+ }
+
/**
* A simple {@link DataCollector} which wraps a {@link ExecutableManager}
* and lets it handle the input and the output to the managed executable.
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Wed Jun 25 16:18:22 2008
@@ -37,6 +37,7 @@
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
/**
@@ -107,13 +108,16 @@
// Since they both are the same, we can flip them
// for BinaryStorage
load.setInputFileSpec(new FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));
-
- streamInputSpec.setSpec(BinaryStorage.class.getName());
- command.setInputSpec(streamInputSpec);
-
+ streamSpec.setOptimizedSpec(Handle.INPUT,
+ BinaryStorage.class.getName());
optimize = true;
}
}
+ } else {
+ if (e.getSpec() instanceof StreamSpec) {
+ StreamSpec streamSpec = (StreamSpec)e.getSpec();
+ streamSpec.revertOptimizedCommand(Handle.INPUT);
+ }
}
parentLoad = false;
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java Wed Jun 25 16:18:22 2008
@@ -37,6 +37,7 @@
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.optimizer.Optimizer;
import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.streaming.StreamingCommand.Handle;
import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
/**
@@ -62,6 +63,11 @@
super.visitEval(e);
eval = e;
parentEval = true;
+
+ if (e.getSpec() instanceof StreamSpec) {
+ StreamSpec streamSpec = (StreamSpec)e.getSpec();
+ streamSpec.revertOptimizedCommand(Handle.OUTPUT);
+ }
}
public void visitLoad(LOLoad load) {
@@ -133,10 +139,8 @@
// Since they both are the same, we can flip them
// for BinaryStorage
s.setOutputFileSpec(new FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));
-
- streamOutputSpec.setSpec(BinaryStorage.class.getName());
- command.setOutputSpec(streamOutputSpec);
-
+ streamSpec.setOptimizedSpec(Handle.OUTPUT,
+ BinaryStorage.class.getName());
optimize = true;
}
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=671681&r1=671680&r2=671681&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Wed Jun 25 16:18:22 2008
@@ -5,6 +5,8 @@
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -25,7 +27,7 @@
* details such as input/output/error specifications and also files to be
* shipped to the cluster and files to be cached.
*/
-public class StreamingCommand implements Serializable {
+public class StreamingCommand implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
// External command to be executed and it's parsed components
@@ -360,9 +362,40 @@
}
public String toString() {
- return executable;
+ StringBuffer sb = new StringBuffer();
+ for (String arg : getCommandArgs()) {
+ sb.append(arg);
+ sb.append(" ");
+ }
+ sb.append("(" + getInputSpec().toString() + "/"+getOutputSpec() + ")");
+
+ return sb.toString();
}
+ public Object clone() {
+ try {
+ StreamingCommand clone = (StreamingCommand)super.clone();
+
+ clone.shipSpec = new ArrayList<String>(shipSpec);
+ clone.cacheSpec = new ArrayList<String>(cacheSpec);
+
+ clone.handleSpecs = new HashMap<Handle, List<HandleSpec>>();
+ for (Map.Entry<Handle, List<HandleSpec>> e : handleSpecs.entrySet()) {
+ List<HandleSpec> values = new ArrayList<HandleSpec>();
+ for (HandleSpec spec : e.getValue()) {
+ values.add((HandleSpec)spec.clone());
+ }
+ clone.handleSpecs.put(e.getKey(), values);
+ }
+
+ return clone;
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
+
+
/**
* Specification about the usage of the {@link Handle} to communicate
* with the external process.
@@ -373,7 +406,7 @@
* to/from the stream.
*/
public static class HandleSpec
- implements Comparable<HandleSpec>, Serializable {
+ implements Comparable<HandleSpec>, Serializable, Cloneable {
private static final long serialVersionUID = 1L;
String name;
@@ -408,7 +441,7 @@
}
public String toString() {
- return name + " using " + spec;
+ return name + "-" + spec;
}
/**
@@ -450,5 +483,20 @@
public void setSpec(String spec) {
this.spec = spec;
}
+
+ public boolean equals(Object obj) {
+ HandleSpec other = (HandleSpec)obj;
+ return (name.equals(other.name) && spec.equals(other.spec));
+ }
+
+
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
}
}
\ No newline at end of file