You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/10 22:00:36 UTC
svn commit: r693961 - in /incubator/pig/branches/types:
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/ test/org/apache/pig/test/
Author: olga
Date: Wed Sep 10 13:00:36 2008
New Revision: 693961
URL: http://svn.apache.org/viewvc?rev=693961&view=rev
Log:
streaming merge
Added:
incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java
incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
Modified:
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
incubator/pig/branches/types/test/org/apache/pig/test/Util.java
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java Wed Sep 10 13:00:36 2008
@@ -182,6 +182,10 @@
public void visit(LOUserFunc op) throws VisitorException {
op.setPlan(mCurrentWalker.getPlan());
}
+
+ public void visit(LOStream op) throws VisitorException {
+ op.setPlan(mCurrentWalker.getPlan());
+ }
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Wed Sep 10 13:00:36 2008
@@ -30,6 +30,9 @@
* An optimizer for logical plans.
*/
public class LogicalOptimizer extends PlanOptimizer<LogicalOperator, LogicalPlan> {
+
+ public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
+ public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
public LogicalOptimizer(LogicalPlan plan) {
super(plan);
@@ -53,12 +56,22 @@
// Add type casting to plans where the schema has been declared (by
// user, data, or data catalog).
nodes = new ArrayList<String>(1);
- nodes.add("org.apache.pig.impl.logicalLayer.LOLoad");
+ nodes.add(LOLOAD_CLASSNAME);
edges = new HashMap<Integer, Integer>();
required = new ArrayList<Boolean>(1);
required.add(true);
mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, required,
- new TypeCastInserter(plan)));
+ new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+
+ // Add type casting to plans where the schema has been declared by
+ // user in a statement with stream operator.
+ nodes = new ArrayList<String>(1);
+ nodes.add(LOSTREAM_CLASSNAME);
+ edges = new HashMap<Integer, Integer>();
+ required = new ArrayList<Boolean>(1);
+ required.add(true);
+ mRules.add(new Rule(nodes, edges, required,
+ new TypeCastInserter(plan, LOSTREAM_CLASSNAME)));
// Push up limit where ever possible.
nodes = new ArrayList<String>(1);
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Wed Sep 10 13:00:36 2008
@@ -73,7 +73,7 @@
public void transform(List<LogicalOperator> nodes) throws OptimizerException {
LogicalOperator lo = nodes.get(0);
if (lo == null || !(lo instanceof LOLimit)) {
- throw new RuntimeException("Expected load, got " +
+ throw new RuntimeException("Expected limit, got " +
lo.getClass().getName());
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Wed Sep 10 13:00:36 2008
@@ -27,10 +27,9 @@
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOCast;
import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOGenerate;
import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LOProject;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LOStream;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -38,9 +37,6 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
/**
* A visitor to discover if any schema has been specified for a file being
* loaded. If so, a projection will be injected into the plan to cast the
@@ -52,23 +48,17 @@
*/
public class TypeCastInserter extends LogicalTransformer {
- private static final Log log = LogFactory.getLog(TypeCastInserter.class);
+ private String operatorClassName;
- public TypeCastInserter(LogicalPlan plan) {
+ public TypeCastInserter(LogicalPlan plan, String operatorClassName) {
super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+ this.operatorClassName = operatorClassName;
}
@Override
public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
try {
- LogicalOperator lo = nodes.get(0);
- if (lo == null || !(lo instanceof LOLoad)) {
- throw new RuntimeException("Expected load, got " +
- lo.getClass().getName());
- }
-
- LOLoad load = (LOLoad)lo;
- Schema s = load.getSchema();
+ Schema s = getOperator(nodes).getSchema();
if (s == null) return false;
boolean sawOne = false;
@@ -86,20 +76,36 @@
" check if type casts are needed", fe);
}
}
-
- @Override
- public void transform(List<LogicalOperator> nodes) throws OptimizerException {
- try {
- LogicalOperator lo = nodes.get(0);
+
+ private LogicalOperator getOperator(List<LogicalOperator> nodes) throws FrontendException {
+ LogicalOperator lo = nodes.get(0);
+ if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
if (lo == null || !(lo instanceof LOLoad)) {
throw new RuntimeException("Expected load, got " +
lo.getClass().getName());
}
-
- LOLoad load = (LOLoad)lo;
- Schema s = load.getSchema();
- String scope = load.getOperatorKey().scope;
+ return lo;
+ } else if(operatorClassName == LogicalOptimizer.LOSTREAM_CLASSNAME){
+ if (lo == null || !(lo instanceof LOStream)) {
+ throw new RuntimeException("Expected stream, got " +
+ lo.getClass().getName());
+ }
+
+ return lo;
+ } else {
+ // we should never be called with any other operator class name
+ throw new FrontendException("TypeCastInserter invoked with an invalid operator class name:" + operatorClassName);
+ }
+
+ }
+
+ @Override
+ public void transform(List<LogicalOperator> nodes) throws OptimizerException {
+ try {
+ LogicalOperator lo = getOperator(nodes);
+ Schema s = lo.getSchema();
+ String scope = lo.getOperatorKey().scope;
// For every field, build a logical plan. If the field has a type
// other than byte array, then the plan will be cast(project). Else
// it will just be project.
@@ -113,7 +119,7 @@
List<Integer> toProject = new ArrayList<Integer>(1);
toProject.add(i);
LOProject proj = new LOProject(p, OperatorKey.genOpKey(scope),
- load, toProject);
+ lo, toProject);
p.add(proj);
Schema.FieldSchema fs = s.getField(i);
if (fs.type != DataType.BYTEARRAY) {
@@ -136,7 +142,7 @@
OperatorKey.genOpKey(scope), genPlans, flattens);
// Insert the foreach into the plan and patch up the plan.
- insertAfter(load, foreach, null);
+ insertAfter(lo, foreach, null);
rebuildSchemas();
Added: incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java?rev=693961&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java Wed Sep 10 13:00:36 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.apache.pig.ExecType.LOCAL;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.ExecType;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class PigExecTestCase extends TestCase {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ protected ExecType execType = MAPREDUCE;
+
+ private MiniCluster cluster;
+ protected PigServer pigServer;
+
+ @Before
+ @Override
+ protected void setUp() throws Exception {
+
+ String execTypeString = System.getProperty("test.exectype");
+ if(execTypeString!=null && execTypeString.length()>0){
+ execType = PigServer.parseExecType(execTypeString);
+ }
+ if(execType == MAPREDUCE) {
+ cluster = MiniCluster.buildCluster();
+ pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+ } else {
+ pigServer = new PigServer(LOCAL);
+ }
+ }
+
+ @After
+ @Override
+ protected void tearDown() throws Exception {
+ pigServer.shutdown();
+ }
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java?rev=693961&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java Wed Sep 10 13:00:36 2008
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStreaming extends PigExecTestCase {
+
+ private TupleFactory tf = DefaultTupleFactory.getInstance();
+
+ private static final String simpleEchoStreamingCommand;
+ static {
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+ simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+ else
+ simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ }
+
+ private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException {
+ Assert.assertEquals(firstField.length, secondField.length);
+
+ Tuple[] expectedResults = new Tuple[firstField.length];
+ for (int i=0; i < expectedResults.length; ++i) {
+ expectedResults[i] = tf.newTuple(2);
+ expectedResults[i].set(0, firstField[i]);
+ expectedResults[i].set(1, secondField[i]);
+ }
+
+ return expectedResults;
+ }
+
+ @Test
+ public void testSimpleMapSideStreaming()
+ throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"};
+ Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+ boolean[] withTypes = {true, false};
+ for (int i = 0; i < withTypes.length; i++) {
+ Tuple[] expectedResults = null;
+ if(withTypes[i] == true) {
+ expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+ } else {
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+ }
+
+ // Pig query to run
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ if(withTypes[i] == true) {
+ pigServer.registerQuery("OP = stream S1 through `" +
+ simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+ } else {
+ pigServer.registerQuery("OP = stream S1 through `" +
+ simpleEchoStreamingCommand + "`;");
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+ }
+ }
+
+ @Test
+ public void testSimpleMapSideStreamingWithOutputSchema()
+ throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ Object[] expectedFirstFields = new String[] {"C", "A", "D", "A"};
+ Object[] expectedSecondFields = new Integer[] {8, 8, 8, 9};
+
+ boolean[] withTypes = {true, false};
+ for (int i = 0; i < withTypes.length; i++) {
+ Tuple[] expectedResults = null;
+ if(withTypes[i] == true) {
+ expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+ } else {
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+ }
+ // Pig query to run
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ if(withTypes[i] == true) {
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+ } else {
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "` as (f0, f1);");
+ }
+ pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > 6;");
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+ }
+ }
+
+ @Test
+ public void testSimpleReduceSideStreamingAfterFlatten()
+ throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"A", "A", "A", "B", "C", "D"};
+ Integer[] expectedSecondFields = new Integer[] {5, 8, 9, 5, 8, 8};
+ boolean[] withTypes = {true, false};
+ for (int i = 0; i < withTypes.length; i++) {
+ Tuple[] expectedResults = null;
+ if(withTypes[i] == true) {
+ expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+ } else {
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+ }
+
+ // Pig query to run
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
+ pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
+ "generate flatten($1);");
+ pigServer.registerQuery("S1 = stream FLATTENED_GROUPED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ if(withTypes[i] == true) {
+ pigServer.registerQuery("OP = stream S1 through `" +
+ simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+ } else {
+ pigServer.registerQuery("OP = stream S1 through `" +
+ simpleEchoStreamingCommand + "`;");
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+ }
+ }
+
+ @Test
+ public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1,2,3", "B,2,4,5",
+ "C,3,1,2", "D,2,5,2",
+ "A,5,5,1", "B,5,7,4",
+ "C,8,9,2", "A,8,4,5",
+ "D,8,8,3", "A,9,2,5"}
+ );
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"};
+ Integer[] expectedSecondFields = new Integer[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8};
+ Integer[] expectedThirdFields = new Integer[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8};
+ Integer[] expectedFourthFields = new Integer[] {3, 5, 5, 1, 5, 4, 2, 2, 2, 3};
+ Tuple[] expectedResults = new Tuple[10];
+ for (int i = 0; i < expectedResults.length; ++i) {
+ expectedResults[i] = tf.newTuple(4);
+ expectedResults[i].set(0, expectedFirstFields[i]);
+ expectedResults[i].set(1, expectedSecondFields[i]);
+ expectedResults[i].set(2, expectedThirdFields[i]);
+ expectedResults[i].set(3, expectedFourthFields[i]);
+ }
+ //setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("S2 = stream S1 through `" +
+ simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("GROUPED_DATA = group IP by $0;");
+ pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
+ " D = order IP BY $2, $3;" +
+ " generate flatten(D);" +
+ "};");
+ pigServer.registerQuery("S3 = stream ORDERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("OP = stream S3 through `" +
+ simpleEchoStreamingCommand + "` as (f0:chararray, f1:int, f2:int, f3:int);");
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+ }
+
+ @Test
+ public void testInputShipSpecs() throws Exception {
+ // FIXME : this should be tested in all modes
+ if(execType == ExecType.LOCAL)
+ return;
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ " print STDOUT \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ "}",
+ };
+ File command1 = Util.createInputFile("script", "pl", script);
+ File command2 = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "A", "D", "A"};
+ Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+ // Pig query to run
+
+ pigServer.registerQuery(
+ "define CMD1 `" + command1.getName() + " foo` " +
+ "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
+ "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+ "output(stdout using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery(
+ "define CMD2 `" + command2.getName() + " bar` " +
+ "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
+ "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+ "output(stdout using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+ "through CMD1;");
+ pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testInputCacheSpecs() throws Exception {
+ // Can't run this without HDFS
+ if(execType == ExecType.LOCAL)
+ return;
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ " print STDOUT \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ "}",
+ };
+ // Copy the scripts to HDFS
+ File command1 = Util.createInputFile("script", "pl", script);
+ File command2 = Util.createInputFile("script", "pl", script);
+ String c1 = FileLocalizer.hadoopify(command1.toString(),
+ pigServer.getPigContext());
+ String c2 = FileLocalizer.hadoopify(command2.toString(),
+ pigServer.getPigContext());
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "A", "D", "A"};
+ Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD1 `script1.pl foo` " +
+ "cache ('" + c1 + "#script1.pl') " +
+ "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery(
+ "define CMD2 `script2.pl bar` " +
+ "cache ('" + c2 + "#script2.pl') " +
+ "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+ "through CMD1;");
+ pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testOutputShipSpecs() throws Exception {
+ // FIXME : this should be tested in all modes
+ if(execType == ExecType.LOCAL)
+ return;
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+ "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+ "while (<STDIN>) {",
+ " print OUTFILE \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ " print OUTFILE2 \"A,10\n\";",
+ "}",
+ };
+ File command = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "A", "A", "A", "A", "A"};
+ Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD `" + command.getName() + " foo bar` " +
+ "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+ "output('foo' using " + PigStorage.class.getName() + "(','), " +
+ "'bar' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output+"/bar",
+ pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ }
+
+ @Test
+ public void testInputOutputSpecs() throws Exception {
+ // FIXME : this should be tested in all modes
+ if(execType == ExecType.LOCAL)
+ return;
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "open(OUTFILE, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+ "open(OUTFILE2, \">\", $ARGV[2]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ " print OUTFILE \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ " print OUTFILE2 \"$_\n\";",
+ "}",
+ };
+ File command = Util.createInputFile("script", "pl", script);
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "A", "D", "A"};
+ Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD `" + command.getName() + " foo bar foobar` " +
+ "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+ "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+ "output('bar', " +
+ "'foobar' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output+"/foobar",
+ pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+
+ // Cleanup
+ pigServer.deleteFile(output);
+ }
+
+ @Test
+ public void testSimpleMapSideStreamingWithUnixPipes()
+ throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"};
+ Integer[] expectedSecondFields = new Integer[] {1, 2, 3, 2, 5, 5, 8, 8, 8, 9};
+ boolean[] withTypes = {true, false};
+ for (int i = 0; i < withTypes.length; i++) {
+ Tuple[] expectedResults = null;
+ if(withTypes[i] == true) {
+ expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+ } else {
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+ }
+
+ // Pig query to run
+ pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand +
+ " | " + simpleEchoStreamingCommand + "`;");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',');");
+ if(withTypes[i] == true) {
+ pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);");
+ } else {
+ pigServer.registerQuery("OP = stream IP through CMD;");
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+ }
+ }
+
+ @Test
+ public void testLocalNegativeLoadStoreOptimization() throws Exception {
+ testNegativeLoadStoreOptimization(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testMRNegativeLoadStoreOptimization() throws Exception {
+ testNegativeLoadStoreOptimization(ExecType.MAPREDUCE);
+ }
+
+ private void testNegativeLoadStoreOptimization(ExecType execType)
+ throws Exception {
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"};
+ Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+ boolean[] withTypes = {true, false};
+ for (int i = 0; i < withTypes.length; i++) {
+ Tuple[] expectedResults = null;
+ if(withTypes[i] == true) {
+ expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+ } else {
+ expectedResults =
+ setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+ }
+
+ // Pig query to run
+ pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand +
+ "` input(stdin using PigDump);");
+ pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " +
+ PigStorage.class.getName() + "(',') " +
+ "split by 'file';");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ if(withTypes[i] == true) {
+ pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+ } else {
+ pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+ }
+ }
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/Util.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/Util.java Wed Sep 10 13:00:36 2008
@@ -122,10 +122,10 @@
return m;
}
- static public DataByteArray[] toDataByteArrays(String[] input) {
+ static public<T> DataByteArray[] toDataByteArrays(T[] input) {
DataByteArray[] dbas = new DataByteArray[input.length];
for (int i = 0; i < input.length; i++) {
- dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].getBytes());
+ dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].toString().getBytes());
}
return dbas;
}
@@ -185,7 +185,7 @@
for (Tuple expected : expectedResults) {
Tuple actual = actualResults.next();
- Assert.assertEquals(expected.toString(), actual.toString());
+ Assert.assertEquals(expected, actual);
}
}