You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/06 20:11:27 UTC
svn commit: r634380 - in /incubator/pig/trunk: ./
src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/logicalLayer/parser/
src/org/apache/pig/impl/streaming/ test/org/apache/pig/test/
Author: olga
Date: Thu Mar 6 11:11:26 2008
New Revision: 634380
URL: http://svn.apache.org/viewvc?rev=634380&view=rev
Log:
PIG-94: M1 for streaming: maps and reduce side support with default
(de)serializer (acmurthy via olgan)
Added:
incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/
incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/trunk/test/org/apache/pig/test/Util.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Mar 6 11:11:26 2008
@@ -157,3 +157,6 @@
PIG-120: Support map reduce in local mode. To do this user needs to
specify execution type as mapreduce and cluster name as local (joa23 via
gates).
+
+ PIG-94: M1 for streaming: maps and reduce side support with default
+ (de)serializer (acmurthy via olgan)
Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpecVisitor.java Thu Mar 6 11:11:26 2008
@@ -143,7 +143,12 @@
public void visitTupleWindow(TupleWindowSpec tw) {
}
-
+ /**
+ * Only StreamSpec.visit() and subclass implementations of this function
+ * should ever call this method.
+ */
+ public void visitStream(StreamSpec stream) {
+ }
}
Added: incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/StreamSpec.java Thu Mar 6 11:11:26 2008
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.eval;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.streaming.PigExecutableManager;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Datum;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+public class StreamSpec extends EvalSpec {
+ private static final long serialVersionUID = 1L;
+
+ private static final Log LOG =
+ LogFactory.getLog(StreamSpec.class.getName());
+
+ private String streamingCommand; // Actual command to be run
+ private String deserializer; // LoadFunc to be used
+ private String serializer; // StoreFunc to be used
+
+ public StreamSpec(String streamingCommand) {
+ this.streamingCommand = streamingCommand;
+ this.deserializer = PigStorage.class.getName();
+ this.serializer = PigStorage.class.getName();
+ }
+
+ public StreamSpec(String streamingCommand,
+ String deserializer, String serializer) {
+ this.streamingCommand = streamingCommand;
+ this.deserializer = deserializer;
+ this.serializer = serializer;
+ }
+
+ @Override
+ public List<String> getFuncs() {
+ // No user-defined functions here
+ return new ArrayList<String>();
+ }
+
+ protected Schema mapInputSchema(Schema schema) {
+ // EvalSpec _has_ to have the schema if there is one...
+ return null;
+ }
+
+ protected DataCollector setupDefaultPipe(DataCollector endOfPipe) {
+ return new StreamDataCollector(streamingCommand,
+ (deserializer == null) ? new PigStorage() :
+ (LoadFunc)PigContext.instantiateFuncFromSpec(
+ deserializer),
+ (serializer == null) ? new PigStorage() :
+ (StoreFunc)PigContext.instantiateFuncFromSpec(
+ serializer),
+ endOfPipe);
+ }
+
+ public void visit(EvalSpecVisitor v) {
+ v.visitStream(this);
+ }
+
+ /**
+ * A simple {@link DataCollector} which wraps a {@link PigExecutableManager}
+ * and lets it handle the input and the output to the managed executable.
+ */
+ private static class StreamDataCollector extends DataCollector {
+ PigExecutableManager executable; //Executable manager
+
+ public StreamDataCollector(String streamingCommand,
+ LoadFunc deserializer, StoreFunc serializer,
+ DataCollector endOfPipe) {
+ super(endOfPipe);
+
+ DataCollector successor =
+ new DataCollector(endOfPipe) {
+ public void add(Datum d) {
+ // Just forward the data to the next EvalSpec in the pipeline
+ addToSuccessor(d);
+ }
+ };
+
+ try {
+ // Create the PigExecutableManager
+ executable = new PigExecutableManager(streamingCommand,
+ deserializer, serializer,
+ successor);
+
+ executable.configure();
+
+ // Start the executable
+ executable.run();
+ } catch (Exception e) {
+ LOG.fatal("Failed to create/start PigExecutableManager with: " + e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void add(Datum d) {
+ try {
+ executable.add(d);
+ } catch (IOException ioe) {
+ LOG.fatal("executable.add(" + d + ") failed with: " + ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+
+ protected void finish() {
+ try {
+ executable.close();
+ }
+ catch (Exception e) {
+ LOG.fatal("Failed to close PigExecutableManager with: " + e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Mar 6 11:11:26 2008
@@ -298,6 +298,9 @@
TOKEN : { <GENERATE : "generate"> }
TOKEN : { <FLATTEN : "flatten"> }
TOKEN : { <EVAL : "eval"> }
+TOKEN : { <STREAM : "stream"> }
+TOKEN : { <THROUGH : "through"> }
+TOKEN : { <BACKTICK : "`"> }
TOKEN:
{
@@ -316,6 +319,7 @@
}
TOKEN : { <QUOTEDSTRING : "'" (~["'"])* "'"> }
+TOKEN : { <EXECCOMMAND : "`" (~["`"])* "`"> }
// Pig has special variables starting with $
TOKEN : { <DOLLARVAR : "$" <INTEGER> > }
@@ -411,6 +415,7 @@
| (<JOIN> op = JoinClause())
| (<UNION> op = UnionClause())
| (<FOREACH> op = ForEachClause())
+| (<STREAM> op = StreamClause() [<AS> schema = SchemaTuple() {op.setSchema(schema);} ])
)
[<PARALLEL> t2=<NUMBER> { op.setRequestedParallelism(Integer.parseInt(t2.image));} ]
)
@@ -1126,3 +1131,22 @@
return item;
}
}
+
+LogicalOperator StreamClause(): {LogicalOperator input; String streamingCommand;}
+{
+ input = NestedExpr()
+
+ <THROUGH> streamingCommand = StreamingCommand()
+ {
+ return new LOEval(opTable, scope, getNextId(), input.getOperatorKey(), new StreamSpec(streamingCommand));
+ }
+}
+
+String StreamingCommand(): {Token t;}
+{
+ t = <EXECCOMMAND>
+ {
+ return unquote(t.image);
+ }
+}
+
Added: incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/PigExecutableManager.java Thu Mar 6 11:11:26 2008
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.streaming;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.eval.collector.DataCollector;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+
+/**
+ * {@link PigExecutableManager} manages an external executable which processes data
+ * in a Pig query.
+ *
+ * The <code>PigExecutableManager</code> is responsible for startup/teardown of the
+ * external process and also for managing it.
+ * It feeds input records to the executable via it's <code>stdin</code>,
+ * collects the output records from the <code>stdout</code> and also diagnostic
+ * information from the <code>stdout</code>.
+ */
+public class PigExecutableManager {
+ private static final Log LOG =
+ LogFactory.getLog(PigExecutableManager.class.getName());
+
+ String command; // Streaming command to be run
+ String[] argv; // Parsed/split commands
+
+ Process process; // Handle to the process
+ private static int SUCCESS = 0;
+
+ protected DataOutputStream stdin; // stdin of the process
+ StoreFunc serializer; // serializer to be used to
+ // send data to the process
+
+ ProcessOutputThread stdoutThread; // thread to get process output
+ InputStream stdout; // stdout of the process
+ LoadFunc deserializer; // deserializer to be used to
+ // interpret the process' output
+
+ ProcessErrorThread stderrThread; // thread to get process output
+ InputStream stderr; // stderr of the process
+
+ DataCollector endOfPipe;
+
+ public PigExecutableManager(String command,
+ LoadFunc deserializer, StoreFunc serializer,
+ DataCollector endOfPipe) throws Exception {
+ this.command = command;
+
+ this.argv = splitArgs(this.command);
+ if (LOG.isDebugEnabled()) {
+ for (String cmd : argv) {
+ LOG.debug("argv: " + cmd);
+ }
+ }
+
+ this.deserializer = deserializer;
+ this.serializer = serializer;
+ this.endOfPipe = endOfPipe;
+ }
+
+ private static final char SINGLE_QUOTE = '\'';
+ private static final char DOUBLE_QUOTE = '"';
+ private static String[] splitArgs(String command) throws Exception {
+ List<String> argv = new ArrayList<String>();
+
+ int beginIndex = 0;
+
+ while (beginIndex < command.length()) {
+ // Skip spaces
+ while (Character.isWhitespace(command.charAt(beginIndex))) {
+ ++beginIndex;
+ }
+
+ char delim = ' ';
+ char charAtIndex = command.charAt(beginIndex);
+ if (charAtIndex == SINGLE_QUOTE || charAtIndex == DOUBLE_QUOTE) {
+ delim = charAtIndex;
+ }
+
+ int endIndex = command.indexOf(delim, beginIndex+1);
+ if (endIndex == -1) {
+ if (Character.isWhitespace(delim)) {
+ // Reached end of command-line
+ argv.add(command.substring(beginIndex));
+ break;
+ } else {
+ // Didn't find the ending quote/double-quote
+ throw new ParseException("Illegal command: " + command);
+ }
+ }
+
+ if (Character.isWhitespace(delim)) {
+ // Do not consume the space
+ argv.add(command.substring(beginIndex, endIndex));
+ } else {
+ // Do not consume the quotes
+ argv.add(command.substring(beginIndex+1, endIndex));
+ }
+
+ beginIndex = endIndex + 1;
+ }
+
+ return argv.toArray(new String[0]);
+ }
+
+ public void configure() {
+ }
+
+ public void close() throws Exception {
+ // Close the stdin to let the process terminate
+ stdin.flush();
+ stdin.close();
+ stdin = null;
+
+ // Wait for the process to exit and the stdout/stderr threads to complete
+ int exitCode = -1;
+ try {
+ exitCode = process.waitFor();
+
+ if (stdoutThread != null) {
+ stdoutThread.join(0);
+ }
+ if (stderrThread != null) {
+ stderrThread.join(0);
+ }
+
+ } catch (InterruptedException ie) {}
+
+ // Clean up the process
+ process.destroy();
+
+ LOG.debug("Process exited with: " + exitCode);
+ if (exitCode != SUCCESS) {
+ throw new ExecException(command + " failed with exit status: " +
+ exitCode);
+ }
+ }
+
+ public void run() throws IOException {
+ // Run the executable
+ ProcessBuilder processBuilder = new ProcessBuilder(argv);
+ process = processBuilder.start();
+ LOG.debug("Started the process for command: " + command);
+
+ // Pick up the process' stdin/stdout/stderr streams
+ stdin =
+ new DataOutputStream(new BufferedOutputStream(process.getOutputStream()));
+ stdout =
+ new DataInputStream(new BufferedInputStream(process.getInputStream()));
+ stderr =
+ new DataInputStream(new BufferedInputStream(process.getErrorStream()));
+
+ // Attach the serializer to the stdin of the process for sending tuples
+ serializer.bindTo(stdin);
+
+ // Attach the deserializer to the stdout of the process to get tuples
+ deserializer.bindTo("", new BufferedPositionedInputStream(stdout), 0,
+ Long.MAX_VALUE);
+
+ // Start the threads to process the executable's stdout and stderr
+ stdoutThread = new ProcessOutputThread(deserializer);
+ stdoutThread.start();
+ stderrThread = new ProcessErrorThread();
+ stderrThread.start();
+ }
+
+ public void add(Datum d) throws IOException {
+ // Pass the serialized tuple to the executable
+ serializer.putNext((Tuple)d);
+ stdin.flush();
+ }
+
+ /**
+ * Workhorse to process the stdout of the managed process.
+ *
+ * The <code>PigExecutableManager</code>, by default, just pushes the received
+ * <code>Datum</code> into eval-pipeline to be processed by the successor.
+ *
+ * @param d <code>Datum</code> to process
+ */
+ protected void processOutput(Datum d) {
+ endOfPipe.add(d);
+ }
+
+ class ProcessOutputThread extends Thread {
+
+ LoadFunc deserializer;
+
+ ProcessOutputThread(LoadFunc deserializer) {
+ setDaemon(true);
+ this.deserializer = deserializer;
+ }
+
+ public void run() {
+ try {
+ // Read tuples from the executable and push them down the pipe
+ Tuple tuple = null;
+ while ((tuple = deserializer.getNext()) != null) {
+ processOutput(tuple);
+ }
+
+ if (stdout != null) {
+ stdout.close();
+ LOG.debug("ProcessOutputThread done");
+ }
+ } catch (Throwable th) {
+ LOG.warn(th);
+ try {
+ if (stdout != null) {
+ stdout.close();
+ }
+ } catch (IOException ioe) {
+ LOG.info(ioe);
+ }
+ throw new RuntimeException(th);
+ }
+ }
+ }
+
+ /**
+ * Workhorse to process the stderr stream of the managed process.
+ *
+ * By default <code>PigExecuatbleManager</code> just sends out the received
+ * error message to the <code>stderr</code> of itself.
+ *
+ * @param error error message from the managed process.
+ */
+ protected void processError(String error) {
+ // Just send it out to our stderr
+ System.err.println(error);
+ }
+
+ class ProcessErrorThread extends Thread {
+
+ public ProcessErrorThread() {
+ setDaemon(true);
+ }
+
+ public void run() {
+ try {
+ String error;
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(stderr));
+ while ((error = reader.readLine()) != null) {
+ processError(error);
+ }
+
+ if (stderr != null) {
+ stderr.close();
+ LOG.debug("ProcessErrorThread done");
+ }
+ } catch (Throwable th) {
+ LOG.warn(th);
+ try {
+ if (stderr != null) {
+ stderr.close();
+ }
+ } catch (IOException ioe) {
+ LOG.info(ioe);
+ throw new RuntimeException(th);
+ }
+ }
+ }
+ }
+}
Added: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=634380&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu Mar 6 11:11:26 2008
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.*;
+import static org.apache.pig.PigServer.ExecType.MAPREDUCE;
+
+import junit.framework.TestCase;
+
+public class TestStreaming extends TestCase {
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+
+ private static final String simpleEchoStreamingCommand =
+ "perl -ne 'chomp $_; print \"$_\n\"'";
+
+ private Tuple[] setupExpectedResults(String[] firstField, int[] secondField) {
+ Assert.assertEquals(firstField.length, secondField.length);
+
+ Tuple[] expectedResults = new Tuple[firstField.length];
+ for (int i=0; i < expectedResults.length; ++i) {
+ expectedResults[i] = new Tuple(2);
+ expectedResults[i].setField(0, firstField[i]);
+ expectedResults[i].setField(1, secondField[i]);
+ }
+
+ return expectedResults;
+ }
+
+ @Test
+ public void testSimpleMapSideStreaming() throws Exception {
+ PigServer pigServer = new PigServer(MAPREDUCE);
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"};
+ int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+ pigServer.registerQuery("OUTPUT = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ }
+
+ @Test
+ public void testSimpleMapSideStreamingWithOutputSchema() throws Exception {
+ PigServer pigServer = new PigServer(MAPREDUCE);
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"C", "A", "D", "A"};
+ int[] expectedSecondFields = new int[] {8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "` as (f0, f1);");
+ pigServer.registerQuery("OUTPUT = filter STREAMED_DATA by f1 > '6';");
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ }
+
+ @Test
+ public void testSimpleReduceSideStreamingAfterFlatten() throws Exception {
+ PigServer pigServer = new PigServer(MAPREDUCE);
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"A", "A", "A", "B", "C", "D"};
+ int[] expectedSecondFields = new int[] {5, 8, 9, 5, 8, 8};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+ pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
+ pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
+ "generate flatten($1);");
+ pigServer.registerQuery("OUTPUT = stream FLATTENED_GROUPED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ }
+
+ @Test
+ public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
+ PigServer pigServer = new PigServer(MAPREDUCE);
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1,2,3", "B,2,4,5",
+ "C,3,1,2", "D,2,5,2",
+ "A,5,5,1", "B,5,7,4",
+ "C,8,9,2", "A,8,4,5",
+ "D,8,8,3", "A,9,2,5"}
+ );
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"};
+ int[] expectedSecondFields = new int[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8};
+ int[] expectedThirdFields = new int[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8};
+ int[] expectedFourthFields = new int[] {3, 5, 5, 1, 5, 4, 2, 2, 2, 3};
+ Tuple[] expectedResults = new Tuple[10];
+ for (int i = 0; i < expectedResults.length; ++i) {
+ expectedResults[i] = new Tuple(4);
+ expectedResults[i].setField(0, expectedFirstFields[i]);
+ expectedResults[i].setField(1, expectedSecondFields[i]);
+ expectedResults[i].setField(2, expectedThirdFields[i]);
+ expectedResults[i].setField(3, expectedFourthFields[i]);
+ }
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery("INPUT = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter INPUT by $1 > '3';");
+ pigServer.registerQuery("GROUPED_DATA = group INPUT by $0;");
+ pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
+ " D = order INPUT BY $2, $3;" +
+ " generate flatten(D);" +
+ "};");
+ pigServer.registerQuery("OUTPUT = stream ORDERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OUTPUT"), expectedResults);
+ }
+
+}
Modified: incubator/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/Util.java?rev=634380&r1=634379&r2=634380&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/Util.java Thu Mar 6 11:11:26 2008
@@ -17,9 +17,13 @@
*/
package org.apache.pig.test;
+import java.io.File;
import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
import org.apache.pig.data.*;
+import org.junit.Assert;
public class Util {
// Helper Functions
@@ -68,4 +72,43 @@
}
return t;
}
+
+ /**
+ * Helper to create a temporary file with given input data for use in test cases.
+ *
+ * @param tmpFilenamePrefix file-name prefix
+ * @param tmpFilenameSuffix file-name suffix
+ * @param inputData input for test cases, each string in inputData[] is written
+ * on one line
+ * @return {@link File} handle to the created temporary file
+ * @throws IOException
+ */
+ static public File createInputFile(String tmpFilenamePrefix,
+ String tmpFilenameSuffix,
+ String[] inputData)
+ throws IOException {
+ File f = File.createTempFile(tmpFilenamePrefix, tmpFilenameSuffix);
+ PrintWriter pw = new PrintWriter(f);
+ for (int i=0; i<inputData.length; i++){
+ pw.println(inputData[i]);
+ }
+ pw.close();
+ return f;
+ }
+
+ /**
+ * Helper function to check if the result of a Pig Query is in line with
+ * expected results.
+ *
+ * @param actualResults Result of the executed Pig query
+ * @param expectedResults Expected results to validate against
+ */
+ static public void checkQueryOutputs(Iterator<Tuple> actualResults,
+ Tuple[] expectedResults) {
+ for (Tuple expected : expectedResults) {
+ Tuple actual = actualResults.next();
+ Assert.assertEquals(expected, actual);
+ }
+ }
+
}