You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/10 22:00:36 UTC

svn commit: r693961 - in /incubator/pig/branches/types: src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/optimizer/ test/org/apache/pig/test/

Author: olga
Date: Wed Sep 10 13:00:36 2008
New Revision: 693961

URL: http://svn.apache.org/viewvc?rev=693961&view=rev
Log:
streaming merge

Added:
    incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
    incubator/pig/branches/types/test/org/apache/pig/test/Util.java

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/PlanSetter.java Wed Sep 10 13:00:36 2008
@@ -182,6 +182,10 @@
     public void visit(LOUserFunc op) throws VisitorException {
         op.setPlan(mCurrentWalker.getPlan());
     }
+    
+    public void visit(LOStream op) throws VisitorException {
+        op.setPlan(mCurrentWalker.getPlan());
+    }
 
 }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Wed Sep 10 13:00:36 2008
@@ -30,6 +30,9 @@
  * An optimizer for logical plans.
  */
 public class LogicalOptimizer extends PlanOptimizer<LogicalOperator, LogicalPlan> {
+    
+    public static final String LOLOAD_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOLoad";
+    public static final String LOSTREAM_CLASSNAME = "org.apache.pig.impl.logicalLayer.LOStream";
 
     public LogicalOptimizer(LogicalPlan plan) {
         super(plan);
@@ -53,12 +56,22 @@
         // Add type casting to plans where the schema has been declared (by
         // user, data, or data catalog).
         nodes = new ArrayList<String>(1);
-        nodes.add("org.apache.pig.impl.logicalLayer.LOLoad");
+        nodes.add(LOLOAD_CLASSNAME);
         edges = new HashMap<Integer, Integer>();
         required = new ArrayList<Boolean>(1);
         required.add(true);
         mRules.add(new Rule<LogicalOperator, LogicalPlan>(nodes, edges, required,
-            new TypeCastInserter(plan)));
+            new TypeCastInserter(plan, LOLOAD_CLASSNAME)));
+        
+        // Add type casting to plans where the schema has been declared by
+        // user in a statement with stream operator.
+        nodes = new ArrayList<String>(1);
+        nodes.add(LOSTREAM_CLASSNAME);
+        edges = new HashMap<Integer, Integer>();
+        required = new ArrayList<Boolean>(1);
+        required.add(true);
+        mRules.add(new Rule(nodes, edges, required,
+            new TypeCastInserter(plan, LOSTREAM_CLASSNAME)));
         
         // Push up limit where ever possible.
         nodes = new ArrayList<String>(1);

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/OpLimitOptimizer.java Wed Sep 10 13:00:36 2008
@@ -73,7 +73,7 @@
     public void transform(List<LogicalOperator> nodes) throws OptimizerException {
         LogicalOperator lo = nodes.get(0);
         if (lo == null || !(lo instanceof LOLimit)) {
-            throw new RuntimeException("Expected load, got " +
+            throw new RuntimeException("Expected limit, got " +
                 lo.getClass().getName());
         }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/optimizer/TypeCastInserter.java Wed Sep 10 13:00:36 2008
@@ -27,10 +27,9 @@
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOCast;
 import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOGenerate;
 import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LOProject;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
+import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -38,9 +37,6 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * A visitor to discover if any schema has been specified for a file being
  * loaded.  If so, a projection will be injected into the plan to cast the
@@ -52,23 +48,17 @@
  */
 public class TypeCastInserter extends LogicalTransformer {
 
-    private static final Log log = LogFactory.getLog(TypeCastInserter.class);
+    private String operatorClassName;
 
-    public TypeCastInserter(LogicalPlan plan) {
+    public TypeCastInserter(LogicalPlan plan, String operatorClassName) {
         super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+        this.operatorClassName = operatorClassName;
     }
 
     @Override
     public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
         try {
-            LogicalOperator lo = nodes.get(0);
-            if (lo == null || !(lo instanceof LOLoad)) {
-                throw new RuntimeException("Expected load, got " +
-                    lo.getClass().getName());
-            }
-
-            LOLoad load = (LOLoad)lo;
-            Schema s = load.getSchema();
+            Schema s = getOperator(nodes).getSchema();
             if (s == null) return false;
     
             boolean sawOne = false;
@@ -86,20 +76,36 @@
                 " check if type casts are needed", fe);
         }
     }
-
-    @Override
-    public void transform(List<LogicalOperator> nodes) throws OptimizerException {
-        try {
-            LogicalOperator lo = nodes.get(0);
+    
+    private LogicalOperator getOperator(List<LogicalOperator> nodes) throws FrontendException {
+        LogicalOperator lo = nodes.get(0);
+        if(operatorClassName == LogicalOptimizer.LOLOAD_CLASSNAME) {
             if (lo == null || !(lo instanceof LOLoad)) {
                 throw new RuntimeException("Expected load, got " +
                     lo.getClass().getName());
             }
-
-            LOLoad load = (LOLoad)lo;
     
-            Schema s = load.getSchema();
-            String scope = load.getOperatorKey().scope;
+            return lo;
+        } else if(operatorClassName == LogicalOptimizer.LOSTREAM_CLASSNAME){
+            if (lo == null || !(lo instanceof LOStream)) {
+                throw new RuntimeException("Expected stream, got " +
+                    lo.getClass().getName());
+            }
+    
+            return lo;
+        } else {
+            // we should never be called with any other operator class name
+            throw new FrontendException("TypeCastInserter invoked with an invalid operator class name:" + operatorClassName);
+        }
+   
+    }
+
+    @Override
+    public void transform(List<LogicalOperator> nodes) throws OptimizerException {
+        try {
+            LogicalOperator lo = getOperator(nodes);
+            Schema s = lo.getSchema();
+            String scope = lo.getOperatorKey().scope;
             // For every field, build a logical plan.  If the field has a type
             // other than byte array, then the plan will be cast(project).  Else
             // it will just be project.
@@ -113,7 +119,7 @@
                 List<Integer> toProject = new ArrayList<Integer>(1);
                 toProject.add(i);
                 LOProject proj = new LOProject(p, OperatorKey.genOpKey(scope),
-                    load, toProject);
+                    lo, toProject);
                 p.add(proj);
                 Schema.FieldSchema fs = s.getField(i);
                 if (fs.type != DataType.BYTEARRAY) {
@@ -136,7 +142,7 @@
                 OperatorKey.genOpKey(scope), genPlans, flattens);
 
             // Insert the foreach into the plan and patch up the plan.
-            insertAfter(load, foreach, null);
+            insertAfter(lo, foreach, null);
 
             rebuildSchemas();
 

Added: incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java?rev=693961&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java Wed Sep 10 13:00:36 2008
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.apache.pig.ExecType.LOCAL;
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.ExecType;
+import org.junit.After;
+import org.junit.Before;
+
+public abstract class PigExecTestCase extends TestCase {
+
+    protected final Log log = LogFactory.getLog(getClass());
+    
+    protected ExecType execType = MAPREDUCE;
+    
+    private MiniCluster cluster;
+    protected PigServer pigServer;
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        
+        String execTypeString = System.getProperty("test.exectype");
+        if(execTypeString!=null && execTypeString.length()>0){
+            execType = PigServer.parseExecType(execTypeString);
+        }
+        if(execType == MAPREDUCE) {
+            cluster = MiniCluster.buildCluster();
+            pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+        } else {
+            pigServer = new PigServer(LOCAL);
+        }
+    }
+
+    @After
+    @Override
+    protected void tearDown() throws Exception {
+        pigServer.shutdown();
+    }
+}

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java?rev=693961&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStreaming.java Wed Sep 10 13:00:36 2008
@@ -0,0 +1,595 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DefaultTupleFactory;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestStreaming extends PigExecTestCase {
+    
+    private TupleFactory tf = DefaultTupleFactory.getInstance();
+
+	private static final String simpleEchoStreamingCommand;
+        static {
+            if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+                simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+            else
+                simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+        }
+
+    private Tuple[] setupExpectedResults(Object[] firstField, Object[] secondField) throws ExecException {
+		Assert.assertEquals(firstField.length, secondField.length);
+		
+		Tuple[] expectedResults = new Tuple[firstField.length];
+		for (int i=0; i < expectedResults.length; ++i) {
+			expectedResults[i] = tf.newTuple(2);
+			expectedResults[i].set(0, firstField[i]);
+			expectedResults[i].set(1, secondField[i]);
+		}
+		
+		return expectedResults;
+	}
+	
+	@Test
+	public void testSimpleMapSideStreaming() 
+	throws Exception {
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1", "B,2", "C,3", "D,2",
+				                                        "A,5", "B,5", "C,8", "A,8",
+				                                        "D,8", "A,9"});
+
+		// Expected results
+		String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"};
+		Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+		boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+    		Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, expectedSecondFields);
+            } else {
+                expectedResults = 
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+            }
+    
+    		// Pig query to run
+    		pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+    				                PigStorage.class.getName() + "(',');");
+    		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+            pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+                                    simpleEchoStreamingCommand + "`;");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream S1 through `" +
+                        simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream S1 through `" +
+    				                simpleEchoStreamingCommand + "`;");
+            }
+    		
+    		// Run the query and check the results
+    		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+        }
+	}
+    
+	@Test
+	public void testSimpleMapSideStreamingWithOutputSchema() 
+	throws Exception {
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1", "B,2", "C,3", "D,2",
+				                                        "A,5", "B,5", "C,8", "A,8",
+				                                        "D,8", "A,9"});
+
+		// Expected results
+		Object[] expectedFirstFields = new String[] {"C", "A", "D", "A"};
+		Object[] expectedSecondFields = new Integer[] {8, 8, 8, 9};
+		
+		boolean[] withTypes = {true, false};
+		for (int i = 0; i < withTypes.length; i++) {
+		    Tuple[] expectedResults = null;
+		    if(withTypes[i] == true) {
+		        expectedResults = 
+	                setupExpectedResults(expectedFirstFields, expectedSecondFields);
+		    } else {
+		        expectedResults = 
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+		    }
+	        // Pig query to run
+	        pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+	                                PigStorage.class.getName() + "(',');");
+	        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+	        if(withTypes[i] == true) {
+	            pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
+                        simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+	        } else {
+	            pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA through `" +
+	                                simpleEchoStreamingCommand + "` as (f0, f1);");
+	        }
+	        pigServer.registerQuery("OP = filter STREAMED_DATA by f1 > 6;");
+	        
+	        // Run the query and check the results
+	        Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+        }
+	}
+
+	@Test
+	public void testSimpleReduceSideStreamingAfterFlatten() 
+	throws Exception {
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1", "B,2", "C,3", "D,2",
+				                                        "A,5", "B,5", "C,8", "A,8",
+				                                        "D,8", "A,9"});
+
+		// Expected results
+		String[] expectedFirstFields = new String[] {"A", "A", "A", "B", "C", "D"};
+		Integer[] expectedSecondFields = new Integer[] {5, 8, 9, 5, 8, 8};
+		boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, expectedSecondFields);
+            } else {
+                expectedResults = 
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+            }
+
+    		// Pig query to run
+    		pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+    				                PigStorage.class.getName() + "(',');");
+    		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+    		pigServer.registerQuery("GROUPED_DATA = group FILTERED_DATA by $0;");
+    		pigServer.registerQuery("FLATTENED_GROUPED_DATA = foreach GROUPED_DATA " +
+    				                "generate flatten($1);");
+            pigServer.registerQuery("S1 = stream FLATTENED_GROUPED_DATA through `" +
+                                    simpleEchoStreamingCommand + "`;");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream S1 through `" +
+                        simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream S1 through `" +
+                                    simpleEchoStreamingCommand + "`;");
+            }
+    		
+    		// Run the query and check the results
+    		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+        }
+	}
+
+    @Test
+	public void testSimpleOrderedReduceSideStreamingAfterFlatten() throws Exception {
+		File input = Util.createInputFile("tmp", "", 
+				                          new String[] {"A,1,2,3", "B,2,4,5",
+				                                        "C,3,1,2", "D,2,5,2",
+				                                        "A,5,5,1", "B,5,7,4",
+				                                        "C,8,9,2", "A,8,4,5",
+				                                        "D,8,8,3", "A,9,2,5"}
+		                                 );
+
+		// Expected results
+		String[] expectedFirstFields = 
+			new String[] {"A", "A", "A", "A", "B", "B", "C", "C", "D", "D"};
+		Integer[] expectedSecondFields = new Integer[] {1, 9, 8, 5, 2, 5, 3, 8, 2, 8};
+		Integer[] expectedThirdFields = new Integer[] {2, 2, 4, 5, 4, 7, 1, 9, 5, 8};
+		Integer[] expectedFourthFields = new Integer[] {3, 5, 5, 1, 5, 4, 2, 2, 2, 3};
+		Tuple[] expectedResults = new Tuple[10];
+		for (int i = 0; i < expectedResults.length; ++i) {
+			expectedResults[i] = tf.newTuple(4);
+			expectedResults[i].set(0, expectedFirstFields[i]);
+			expectedResults[i].set(1, expectedSecondFields[i]);
+			expectedResults[i].set(2, expectedThirdFields[i]);
+			expectedResults[i].set(3, expectedFourthFields[i]);
+		}
+			//setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+		// Pig query to run
+		pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+				                PigStorage.class.getName() + "(',');");
+		pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+        pigServer.registerQuery("S1 = stream FILTERED_DATA through `" +
+                                simpleEchoStreamingCommand + "`;");
+        pigServer.registerQuery("S2 = stream S1 through `" +
+                                simpleEchoStreamingCommand + "`;");
+		pigServer.registerQuery("GROUPED_DATA = group IP by $0;");
+		pigServer.registerQuery("ORDERED_DATA = foreach GROUPED_DATA { " +
+				                "  D = order IP BY $2, $3;" +
+                                "  generate flatten(D);" +
+                                "};");
+        pigServer.registerQuery("S3 = stream ORDERED_DATA through `" +
+                                simpleEchoStreamingCommand + "`;");
+		pigServer.registerQuery("OP = stream S3 through `" +
+				                simpleEchoStreamingCommand + "` as (f0:chararray, f1:int, f2:int, f3:int);");
+		
+		// Run the query and check the results
+		Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+	}
+
+    @Test
+    public void testInputShipSpecs() throws Exception {
+        // FIXME : this should be tested in all modes
+        if(execType == ExecType.LOCAL)
+            return;
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print STDOUT \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "}",
+                         };
+        File command1 = Util.createInputFile("script", "pl", script);
+        File command2 = Util.createInputFile("script", "pl", script);
+        
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults =
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+        // Pig query to run
+        
+        pigServer.registerQuery(
+                "define CMD1 `" + command1.getName() + " foo` " +
+                "ship ('" + Util.encodeEscape(command1.toString()) + "') " +
+                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "output(stdout using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+        pigServer.registerQuery(
+                "define CMD2 `" + command2.getName() + " bar` " +
+                "ship ('" + Util.encodeEscape(command2.toString()) + "') " +
+                "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+                "output(stdout using " + PigStorage.class.getName() + "(',')) " +        
+                "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+        		                "through CMD1;");
+        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+        
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testInputCacheSpecs() throws Exception {
+        // Can't run this without HDFS
+        if(execType == ExecType.LOCAL)
+            return;
+        
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print STDOUT \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "}",
+                         };
+        // Copy the scripts to HDFS
+        File command1 = Util.createInputFile("script", "pl", script);
+        File command2 = Util.createInputFile("script", "pl", script);
+        String c1 = FileLocalizer.hadoopify(command1.toString(), 
+                                            pigServer.getPigContext());
+        String c2 = FileLocalizer.hadoopify(command2.toString(), 
+                                            pigServer.getPigContext());
+        
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults = 
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD1 `script1.pl foo` " +
+                "cache ('" + c1 + "#script1.pl') " +
+                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+        pigServer.registerQuery(
+                "define CMD2 `script2.pl bar` " +
+                "cache ('" + c2 + "#script2.pl') " +
+                "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+                                "through CMD1;");
+        pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");                
+
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+
+    @Test
+	public void testOutputShipSpecs() throws Exception {
+        // FIXME : this should be tested in all modes
+        if(execType == ExecType.LOCAL)
+            return;
+	    File input = Util.createInputFile("tmp", "", 
+	                                      new String[] {"A,1", "B,2", "C,3", 
+	                                                    "D,2", "A,5", "B,5", 
+	                                                    "C,8", "A,8", "D,8", 
+	                                                    "A,9"});
+
+	    // Perl script 
+	    String[] script = 
+	        new String[] {
+	                      "#!/usr/bin/perl",
+                          "open(OUTFILE, \">\", $ARGV[0]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+                          "open(OUTFILE2, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+                          "while (<STDIN>) {",
+                          "  print OUTFILE \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "  print OUTFILE2 \"A,10\n\";",
+                          "}",
+	                     };
+	    File command = Util.createInputFile("script", "pl", script);
+
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "A", "A", "A", "A", "A"};
+        Integer[] expectedSecondFields = new Integer[] {10, 10, 10, 10, 10, 10};
+        Tuple[] expectedResults = 
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD `" + command.getName() + " foo bar` " +
+                "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+        		"output('foo' using " + PigStorage.class.getName() + "(','), " +
+        		"'bar' using " + PigStorage.class.getName() + "(',')) " +
+        		"stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");                
+        
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output+"/bar", 
+                                            pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+    }
+
+    @Test
+    public void testInputOutputSpecs() throws Exception {
+        // FIXME : this should be tested in all modes
+        if(execType == ExecType.LOCAL)
+            return;
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", 
+                                                        "D,2", "A,5", "B,5", 
+                                                        "C,8", "A,8", "D,8", 
+                                                        "A,9"});
+
+        // Perl script 
+        String[] script = 
+            new String[] {
+                          "#!/usr/bin/perl",
+                          "open(INFILE,  $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+                          "open(OUTFILE, \">\", $ARGV[1]) or die \"Can't open \".$ARGV[1].\"!: $!\";",
+                          "open(OUTFILE2, \">\", $ARGV[2]) or die \"Can't open \".$ARGV[2].\"!: $!\";",
+                          "while (<INFILE>) {",
+                          "  chomp $_;",
+                          "  print OUTFILE \"$_\n\";",
+                          "  print STDERR \"STDERR: $_\n\";",
+                          "  print OUTFILE2 \"$_\n\";",
+                          "}",
+                         };
+        File command = Util.createInputFile("script", "pl", script);
+
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        Tuple[] expectedResults = 
+                setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+        // Pig query to run
+        pigServer.registerQuery(
+                "define CMD `" + command.getName() + " foo bar foobar` " +
+                "ship ('" + Util.encodeEscape(command.toString()) + "') " +
+                "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+                "output('bar', " +
+                "'foobar' using " + PigStorage.class.getName() + "(',')) " +
+                "stderr();"); 
+        pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+                                PigStorage.class.getName() + "(',');");
+        pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > 3;");
+        pigServer.registerQuery("OP = stream FILTERED_DATA through CMD;");                
+        
+        String output = "/pig/out";
+        pigServer.deleteFile(output);
+        pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+        
+        InputStream op = FileLocalizer.open(output+"/foobar", 
+                                            pigServer.getPigContext());
+        PigStorage ps = new PigStorage(",");
+        ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE); 
+        List<Tuple> outputs = new ArrayList<Tuple>();
+        Tuple t;
+        while ((t = ps.getNext()) != null) {
+            outputs.add(t);
+        }
+
+        // Run the query and check the results
+        Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+        
+        // Cleanup
+        pigServer.deleteFile(output);
+    }
+
+    @Test
+    public void testSimpleMapSideStreamingWithUnixPipes() 
+    throws Exception {
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", "D,2",
+                                                        "A,5", "B,5", "C,8", "A,8",
+                                                        "D,8", "A,9"});
+
+        // Expected results
+        String[] expectedFirstFields = 
+            new String[] {"A", "B", "C", "D", "A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {1, 2, 3, 2, 5, 5, 8, 8, 8, 9};
+        boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, expectedSecondFields);
+            } else {
+                expectedResults = 
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+            }
+
+            // Pig query to run
+            pigServer.registerQuery("define CMD `" + simpleEchoStreamingCommand + 
+                                    " | " + simpleEchoStreamingCommand + "`;");
+            pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+                                    PigStorage.class.getName() + "(',');");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream IP through CMD as (f0:chararray, f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream IP through CMD;");                
+            }
+            
+            // Run the query and check the results
+            Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+        }
+    }
+
+    @Test
+    public void testLocalNegativeLoadStoreOptimization() throws Exception {
+        testNegativeLoadStoreOptimization(ExecType.LOCAL);
+    }
+    
+    @Test
+    public void testMRNegativeLoadStoreOptimization() throws Exception {
+        testNegativeLoadStoreOptimization(ExecType.MAPREDUCE);
+    }
+    
+    private void testNegativeLoadStoreOptimization(ExecType execType) 
+    throws Exception {
+        File input = Util.createInputFile("tmp", "", 
+                                          new String[] {"A,1", "B,2", "C,3", "D,2",
+                                                        "A,5", "B,5", "C,8", "A,8",
+                                                        "D,8", "A,9"});
+
+        // Expected results
+        String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"};
+        Integer[] expectedSecondFields = new Integer[] {5, 5, 8, 8, 8, 9};
+        boolean[] withTypes = {true, false};
+        for (int i = 0; i < withTypes.length; i++) {
+            Tuple[] expectedResults = null;
+            if(withTypes[i] == true) {
+                expectedResults = 
+                    setupExpectedResults(expectedFirstFields, expectedSecondFields);
+            } else {
+                expectedResults = 
+                    setupExpectedResults(Util.toDataByteArrays(expectedFirstFields), Util.toDataByteArrays(expectedSecondFields));
+            }
+
+            // Pig query to run
+            pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand + 
+                                    "` input(stdin using PigDump);");
+            pigServer.registerQuery("IP = load 'file:" + Util.encodeEscape(input.toString()) + "' using " + 
+                                    PigStorage.class.getName() + "(',') " +
+                                    "split by 'file';");
+            pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+            if(withTypes[i] == true) {
+                pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+                                    simpleEchoStreamingCommand + "` as (f0:chararray, f1:int);");
+            } else {
+                pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+                                    simpleEchoStreamingCommand + "`;");                
+            }
+            
+            // Run the query and check the results
+            Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+        }
+    }
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/Util.java?rev=693961&r1=693960&r2=693961&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/Util.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/Util.java Wed Sep 10 13:00:36 2008
@@ -122,10 +122,10 @@
         return m;
     }
 
-    static public DataByteArray[] toDataByteArrays(String[] input) {
+    static public<T> DataByteArray[] toDataByteArrays(T[] input) {
         DataByteArray[] dbas = new DataByteArray[input.length];
         for (int i = 0; i < input.length; i++) {
-            dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].getBytes());
+            dbas[i] = (input[i] == null)?null:new DataByteArray(input[i].toString().getBytes());
         }        
         return dbas;
     }
@@ -185,7 +185,7 @@
 	    
 		for (Tuple expected : expectedResults) {
 			Tuple actual = actualResults.next();
-			Assert.assertEquals(expected.toString(), actual.toString());
+			Assert.assertEquals(expected, actual);
 		}
 	}