You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/06 20:11:27 UTC

svn commit: r634380 - in /incubator/pig/trunk: ./ src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/streaming/ test/org/apache/pig/test/

Author: olga
Date: Thu Mar  6 11:11:26 2008
New Revision: 634380

URL: http://svn.apache.org/viewvc?rev=634380&view=rev
Log:
PIG-94: M1 for streaming: maps and reduce side support with default
    (de)serializer (acmurthy via olgan)

Added:
    incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/
    incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
    incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/test/org/apache/pig/test/Util.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Mar  6 11:11:26 2008
@@ -157,3 +157,6 @@
 	PIG-120:  Support map reduce in local mode.  To do this user needs to
 	specify execution type as mapreduce and cluster name as local (joa23 via
 	gates).
+
+    PIG-94: M1 for streaming: maps and reduce side support with default
+    (de)serializer (acmurthy via olgan)

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java Thu Mar  6 11:11:26 2008
@@ -143,7 +143,12 @@
     public void visitTupleWindow(TupleWindowSpec tw) {
     }
 
-
+    /**
+     * Only StreamSpec.visit() and subclass implementations of this function 
+     * should ever call this method. 
+     */
+    public void visitStream(StreamSpec stream) {
+    }
 
 }
 

Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Thu Mar  6 11:11:26 2008
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.streaming.PigExecutableManager;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class StreamSpec extends EvalSpec {
+    private static final long serialVersionUID = 1L;
+
+    private static final Log LOG = 
+        LogFactory.getLog(StreamSpec.class.getName());
+
+    private String streamingCommand;                       // Actual command to be run
+    private String deserializer;                           // LoadFunc to be used
+    private String serializer;                             // StoreFunc to be used
+
+    public StreamSpec(String streamingCommand) {
+        this.streamingCommand = streamingCommand;
+        this.deserializer = PigStorage.class.getName();
+        this.serializer = PigStorage.class.getName();
+    }
+
+    public StreamSpec(String streamingCommand, 
+                      String deserializer, String serializer) {
+        this.streamingCommand = streamingCommand;
+        this.deserializer = deserializer;
+        this.serializer = serializer;
+    }
+    
+    @Override
+    public List<String> getFuncs() {
+        // No user-defined functions here
+        return new ArrayList<String>();
+    }
+
+    protected Schema mapInputSchema(Schema schema) {
+        // EvalSpec _has_ to have the schema if there is one...
+        return null;
+    }
+
+    protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+        return new StreamDataCollector(streamingCommand,
+                                       (deserializer == null) ? new PigStorage() :
+                                         (LoadFunc)PigContext.instantiateFuncFromSpec(
+                                                                        deserializer),            
+                                       (serializer == null) ? new PigStorage() :
+                                         (StoreFunc)PigContext.instantiateFuncFromSpec(
+                                                                        serializer),
+                                      endOfPipe);
+    }
+
+    public void visit(EvalSpecVisitor v) {
+        v.visitStream(this);
+    }
+
+    /**
+     * A simple {@link DataCollector} which wraps a {@link PigExecutableManager}
+     * and lets it handle the input and the output to the managed executable.
+     */
+    private static class StreamDataCollector extends DataCollector {
+        PigExecutableManager executable;                          //Executable manager
+        
+        public StreamDataCollector(String streamingCommand,
+                                   LoadFunc deserializer, StoreFunc serializer,
+                                   DataCollector endOfPipe) {
+            super(endOfPipe);
+
+            DataCollector successor = 
+                new DataCollector(endOfPipe) {
+                public void add(Datum d) {
+                    // Just forward the data to the next EvalSpec in the pipeline
+                    addToSuccessor(d);
+                }
+            };
+
+            try {
+                // Create the PigExecutableManager
+                executable = new PigExecutableManager(streamingCommand, 
+                                                      deserializer, serializer, 
+                                                      successor);
+                
+                executable.configure();
+                
+                // Start the executable
+                executable.run();
+            } catch (Exception e) {
+                LOG.fatal("Failed to create/start PigExecutableManager with: " + e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        public void add(Datum d) {
+            try {
+                executable.add(d);
+            } catch (IOException ioe) {
+                LOG.fatal("executable.add(" + d + ") failed with: " + ioe);
+                throw new RuntimeException(ioe);
+            }
+        }
+
+        protected void finish() {
+            try {
+                executable.close();
+            }
+            catch (Exception e) {
+                LOG.fatal("Failed to close PigExecutableManager with: " + e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Mar  6 11:11:26 2008
@@ -298,6 +298,9 @@
 TOKEN : { <GENERATE : "generate"> }
 TOKEN : { <FLATTEN : "flatten"> }
 TOKEN : { <EVAL : "eval"> }
+TOKEN : { <STREAM : "stream"> }
+TOKEN : { <THROUGH : "through"> }
+TOKEN : { <BACKTICK : "`"> }
 
 TOKEN:
 {
@@ -316,6 +319,7 @@
 }
 
 TOKEN : { <QUOTEDSTRING : "'" (~["'"])* "'"> }
+TOKEN : { <EXECCOMMAND : "`" (~["`"])* "`"> }
 // Pig has special variables starting with $
 TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
 
@@ -411,6 +415,7 @@
 |   (<JOIN> op = JoinClause())
 |	(<UNION> op = UnionClause())
 |	(<FOREACH> op = ForEachClause())
+|   (<STREAM> op = StreamClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
 	)
     [<PARALLEL> t2=<NUMBER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
 	)	
@@ -1126,3 +1131,22 @@
 		return item;
 	}
 }
+
+LogicalOperator StreamClause(): {LogicalOperator input; String streamingCommand;}
+{
+	input = NestedExpr()	
+	
+	<THROUGH> streamingCommand = StreamingCommand()
+	{
+		return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), new StreamSpec(streamingCommand));
+	}
+}
+
+String StreamingCommand(): {Token t;}
+{
+	t = <EXECCOMMAND>
+	{
+		return unquote(t.image);
+	}
+}
+

Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java Thu Mar  6 11:11:26 2008
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.streaming;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+
+/**
+ * {@link PigExecutableManager} manages an external executable which processes data
+ * in a Pig query.
+ * 
+ * The <code>PigExecutableManager</code> is responsible for startup/teardown of the 
+ * external process and also for managing it.
+ * It feeds input records to the executable via it's <code>stdin</code>, 
+ * collects the output records from the <code>stdout</code> and also diagnostic 
+ * information from the <code>stdout</code>.
+ */
+public class PigExecutableManager {
+	private static final Log LOG = 
+		LogFactory.getLog(PigExecutableManager.class.getName());
+
+	String command;                            // Streaming command to be run
+	String[] argv;                             // Parsed/split commands 
+
+	Process process;                           // Handle to the process
+	private static int SUCCESS = 0;
+	
+	protected DataOutputStream stdin;          // stdin of the process
+	StoreFunc serializer;                      // serializer to be used to
+	                                           // send data to the process
+	
+	ProcessOutputThread stdoutThread;          // thread to get process output
+	InputStream stdout;                        // stdout of the process
+	LoadFunc deserializer;                     // deserializer to be used to
+	                                           // interpret the process' output
+	
+	ProcessErrorThread stderrThread;           // thread to get process output
+	InputStream stderr;                        // stderr of the process
+	
+	DataCollector endOfPipe;
+
+	public PigExecutableManager(String command, 
+			                    LoadFunc deserializer, StoreFunc serializer, 
+			                    DataCollector endOfPipe) throws Exception {
+		this.command = command;
+		
+		this.argv = splitArgs(this.command);
+		if (LOG.isDebugEnabled()) {
+		    for (String cmd : argv) {
+		        LOG.debug("argv: " + cmd);
+		    }
+		}
+		
+		this.deserializer = deserializer;
+		this.serializer = serializer;
+		this.endOfPipe = endOfPipe;
+	}
+
+	private static final char SINGLE_QUOTE = '\'';
+	private static final char DOUBLE_QUOTE = '"';
+	private static String[] splitArgs(String command) throws Exception {
+		List<String> argv = new ArrayList<String>();
+
+		int beginIndex = 0;
+		
+		while (beginIndex < command.length()) {
+			// Skip spaces
+		    while (Character.isWhitespace(command.charAt(beginIndex))) {
+		        ++beginIndex;
+		    }
+			
+			char delim = ' ';
+			char charAtIndex = command.charAt(beginIndex);
+			if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+				delim = charAtIndex;
+			}
+			
+			int endIndex = command.indexOf(delim, beginIndex+1);
+			if (endIndex == -1) {
+				if (Character.isWhitespace(delim)) {
+					// Reached end of command-line
+					argv.add(command.substring(beginIndex));
+					break;
+				} else {
+					// Didn't find the ending quote/double-quote
+					throw new ParseException("Illegal command: " + command);
+				}
+			}
+			
+			if (Character.isWhitespace(delim)) {
+				// Do not consume the space
+				argv.add(command.substring(beginIndex, endIndex));
+			} else {
+				// Do not consume the quotes
+				argv.add(command.substring(beginIndex+1, endIndex));
+			}
+			
+			beginIndex = endIndex + 1;
+		}
+		
+		return argv.toArray(new String[0]);
+	}
+
+	public void configure() {
+	}
+
+	public void close() throws Exception {
+	    // Close the stdin to let the process terminate
+		stdin.flush();
+		stdin.close();
+		stdin = null;
+		
+		// Wait for the process to exit and the stdout/stderr threads to complete
+		int exitCode = -1;
+		try {
+			exitCode = process.waitFor();
+			
+			if (stdoutThread != null) {
+				stdoutThread.join(0);
+			}
+			if (stderrThread != null) {
+				stderrThread.join(0);
+			}
+
+		} catch (InterruptedException ie) {}
+
+		// Clean up the process
+		process.destroy();
+		
+        LOG.debug("Process exited with: " + exitCode);
+        if (exitCode != SUCCESS) {
+            throw new ExecException(command + " failed with exit status: " + 
+                                       exitCode);
+        }
+	}
+
+	public void run() throws IOException {
+		// Run the executable
+		ProcessBuilder processBuilder = new ProcessBuilder(argv);
+		process = processBuilder.start();
+		LOG.debug("Started the process for command: " + command);
+
+		// Pick up the process' stdin/stdout/stderr streams
+		stdin = 
+			new DataOutputStream(new BufferedOutputStream(process.getOutputStream()));
+		stdout = 
+			new DataInputStream(new BufferedInputStream(process.getInputStream()));
+		stderr = 
+			new DataInputStream(new BufferedInputStream(process.getErrorStream()));
+
+		// Attach the serializer to the stdin of the process for sending tuples 
+		serializer.bindTo(stdin);
+		
+		// Attach the deserializer to the stdout of the process to get tuples
+		deserializer.bindTo("", new BufferedPositionedInputStream(stdout), 0, 
+				            Long.MAX_VALUE);
+		
+		// Start the threads to process the executable's stdout and stderr
+		stdoutThread = new ProcessOutputThread(deserializer);
+		stdoutThread.start();
+		stderrThread = new ProcessErrorThread();
+		stderrThread.start();
+	}
+
+	public void add(Datum d) throws IOException {
+		// Pass the serialized tuple to the executable
+		serializer.putNext((Tuple)d);
+		stdin.flush();
+	}
+
+	/**
+	 * Workhorse to process the stdout of the managed process.
+	 * 
+	 * The <code>PigExecutableManager</code>, by default, just pushes the received
+	 * <code>Datum</code> into eval-pipeline to be processed by the successor.
+	 * 
+	 * @param d <code>Datum</code> to process
+	 */
+	protected void processOutput(Datum d) {
+		endOfPipe.add(d);
+	}
+	
+	class ProcessOutputThread extends Thread {
+
+		LoadFunc deserializer;
+
+		ProcessOutputThread(LoadFunc deserializer) {
+			setDaemon(true);
+			this.deserializer = deserializer;
+		}
+
+		public void run() {
+			try {
+				// Read tuples from the executable and push them down the pipe
+				Tuple tuple = null;
+				while ((tuple = deserializer.getNext()) != null) {
+					processOutput(tuple);
+				}
+
+				if (stdout != null) {
+					stdout.close();
+					LOG.debug("ProcessOutputThread done");
+				}
+			} catch (Throwable th) {
+				LOG.warn(th);
+				try {
+					if (stdout != null) {
+						stdout.close();
+					}
+				} catch (IOException ioe) {
+					LOG.info(ioe);
+				}
+				throw new RuntimeException(th);
+			}
+		}
+	}
+
+	/**
+	 * Workhorse to process the stderr stream of the managed process.
+	 * 
+	 * By default <code>PigExecuatbleManager</code> just sends out the received
+	 * error message to the <code>stderr</code> of itself.
+	 * 
+	 * @param error error message from the managed process.
+	 */
+	protected void processError(String error) {
+		// Just send it out to our stderr
+		System.err.println(error);
+	}
+	
+	class ProcessErrorThread extends Thread {
+
+		public ProcessErrorThread() {
+			setDaemon(true);
+		}
+
+		public void run() {
+			try {
+				String error;
+				BufferedReader reader = 
+					new BufferedReader(new InputStreamReader(stderr));
+				while ((error = reader.readLine()) != null) {
+					processError(error);
+				}
+
+				if (stderr != null) {
+					stderr.close();
+					LOG.debug("ProcessErrorThread done");
+				}
+			} catch (Throwable th) {
+				LOG.warn(th);
+				try {
+					if (stderr != null) {
+						stderr.close();
+					}
+				} catch (IOException ioe) {
+					LOG.info(ioe);
+	                throw new RuntimeException(th);
+				}
+			}
+		}
+	}
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu Mar  6 11:11:26 2008
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.*;
+import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
+
+import junit.framework.TestCase;
+
+public class TestStreaming extends TestCase {
+
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+	private static final String simpleEchoStreamingCommand = 
+		"perl -ne 'chomp $_; print \"$_\n\"'";
+
+	private Tuple[] setupExpectedResults(String[] firstField, int[] secondField) {
+		Assert.assertEquals(firstField.length, secondField.length);
+		
+		Tuple[] expectedResults = new Tuple[firstField.length];
+		for (int i=0; i < expectedResults.length; ++i) {
+			expectedResults[i] = new Tuple(2);
+			expectedResults[i].setField(0, firstField[i]);
+			expectedResults[i].setField(1, secondField[i]);
+		}
+		
+		return expectedResults;
+	}
+	
+	@Test
+	public void testSimpleMapSideStreaming() throws Exception {
+		PigServer pigServer = new PigServer(MAPREDUCE);
+
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1", "B,2", "C,3", "D,2",
+				                                        "A,5", "B,5", "C,8", "A,8",
+				                                        "D,8", "A,9"});
+
+		// Expected results
+		String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"};
+		int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+		Tuple[] expectedResults = 
+			setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+		// Pig query to run
+		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+				                PigStorage.class.getName() + "(',');");
+		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+		pigServer.registerQuery("OUTPUT = stream FILTERED_DATA through `" +
+				                simpleEchoStreamingCommand + "`;");
+		
+		// Run the query and check the results
+		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+	}
+
+	@Test
+	public void testSimpleMapSideStreamingWithOutputSchema() throws Exception {
+		PigServer pigServer = new PigServer(MAPREDUCE);
+
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1", "B,2", "C,3", "D,2",
+				                                        "A,5", "B,5", "C,8", "A,8",
+				                                        "D,8", "A,9"});
+
+		// Expected results
+		String[] expectedFirstFields = new String[] {"C", "A", "D", "A"};
+		int[] expectedSecondFields = new int[] {8, 8, 8, 9};
+		Tuple[] expectedResults = 
+			setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+		// Pig query to run
+		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+				                PigStorage.class.getName() + "(',');");
+		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+		pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
+				                simpleEchoStreamingCommand + "` as (f0, f1);");
+		pigServer.registerQuery("OUTPUT = filter STREAMED_DATA by f1 > '6';");
+		
+		// Run the query and check the results
+		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+	}
+
+	@Test
+	public void testSimpleReduceSideStreamingAfterFlatten() throws Exception {
+		PigServer pigServer = new PigServer(MAPREDUCE);
+
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1", "B,2", "C,3", "D,2",
+				                                        "A,5", "B,5", "C,8", "A,8",
+				                                        "D,8", "A,9"});
+
+		// Expected results
+		String[] expectedFirstFields = new String[] {"A", "A", "A", "B", "C", "D"};
+		int[] expectedSecondFields = new int[] {5, 8, 9, 5, 8, 8};
+		Tuple[] expectedResults = 
+			setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+		// Pig query to run
+		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+				                PigStorage.class.getName() + "(',');");
+		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+		pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
+		pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
+				                "generate flatten($1);");
+		pigServer.registerQuery("OUTPUT = stream FLATTENED_GROUPED_DATA through `" +
+				                simpleEchoStreamingCommand + "`;");
+		
+		// Run the query and check the results
+		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+	}
+
+	@Test
+	public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
+		PigServer pigServer = new PigServer(MAPREDUCE);
+
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1,2,3", "B,2,4,5",
+				                                        "C,3,1,2", "D,2,5,2",
+				                                        "A,5,5,1", "B,5,7,4",
+				                                        "C,8,9,2", "A,8,4,5",
+				                                        "D,8,8,3", "A,9,2,5"}
+		                                 );
+
+		// Expected results
+		String[] expectedFirstFields = 
+			new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"};
+		int[] expectedSecondFields = new int[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8};
+		int[] expectedThirdFields = new int[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8};
+		int[] expectedFourthFields = new int[] {3, 5, 5, 1, 5, 4, 2, 2, 2, 3};
+		Tuple[] expectedResults = new Tuple[10];
+		for (int i = 0; i < expectedResults.length; ++i) {
+			expectedResults[i] = new Tuple(4);
+			expectedResults[i].setField(0, expectedFirstFields[i]);
+			expectedResults[i].setField(1, expectedSecondFields[i]);
+			expectedResults[i].setField(2, expectedThirdFields[i]);
+			expectedResults[i].setField(3, expectedFourthFields[i]);
+		}
+			setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+		// Pig query to run
+		pigServer.registerQuery("INPUT = load 'file:" + input + "' using " + 
+				                PigStorage.class.getName() + "(',');");
+		pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+		pigServer.registerQuery("GROUPED_DATA = group INPUT by $0;");
+		pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
+				                "  D = order INPUT BY $2, $3;" +
+                                "  generate flatten(D);" +
+                                "};");
+		pigServer.registerQuery("OUTPUT = stream ORDERED_DATA through `" +
+				                simpleEchoStreamingCommand + "`;");
+		
+		// Run the query and check the results
+		Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+	}
+	
+}

Modified: incubator/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/Util.java?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/Util.java Thu Mar  6 11:11:26 2008
@@ -17,9 +17,13 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
 
 import org.apache.pig.data.*;
+import org.junit.Assert;
 
 public class Util {
     // Helper Functions
@@ -68,4 +72,43 @@
         }
         return t;
     }
+
+    /**
+     * Helper to create a temporary file with given input data for use in test cases.
+     *  
+     * @param tmpFilenamePrefix file-name prefix
+     * @param tmpFilenameSuffix file-name suffix
+     * @param inputData input for test cases, each string in inputData[] is written
+     *                  on one line
+     * @return {@link File} handle to the created temporary file
+     * @throws IOException
+     */
+	static public File createInputFile(String tmpFilenamePrefix, 
+			                           String tmpFilenameSuffix, 
+			                           String[] inputData) 
+	throws IOException {
+		File f = File.createTempFile(tmpFilenamePrefix, tmpFilenameSuffix);
+		PrintWriter pw = new PrintWriter(f);
+		for (int i=0; i<inputData.length; i++){
+			pw.println(inputData[i]);
+		}
+		pw.close();
+		return f;
+	}
+
+	/**
+	 * Helper function to check if the result of a Pig Query is in line with 
+	 * expected results.
+	 * 
+	 * @param actualResults Result of the executed Pig query
+	 * @param expectedResults Expected results to validate against
+	 */
+	static public void checkQueryOutputs(Iterator<Tuple> actualResults, 
+			                        Tuple[] expectedResults) {
+		for (Tuple expected : expectedResults) {
+			Tuple actual = actualResults.next();
+			Assert.assertEquals(expected, actual);
+		}
+	}
+
 }