You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pi...@apache.org on 2008/07/16 17:38:31 UTC

svn commit: r677313 - in /incubator/pig/branches/types: src/org/apache/pig/impl/mapReduceLayer/ src/org/apache/pig/impl/physicalLayer/ src/org/apache/pig/impl/physicalLayer/relationalOperators/ test/org/apache/pig/test/ test/org/apache/pig/test/utils/

Author: pisong
Date: Wed Jul 16 08:38:29 2008
New Revision: 677313

URL: http://svn.apache.org/viewvc?rev=677313&view=rev
Log:
PIG-305 Fixed POSort to work inside ForEach

Added:
    incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java
Modified:
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
    incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java Wed Jul 16 08:38:29 2008
@@ -168,8 +168,9 @@
                             continue;
                         }
                         
-                        if(redRes.returnStatus==POStatus.STATUS_EOP)
+                        if(redRes.returnStatus==POStatus.STATUS_EOP) {
                             return;
+                        }
                         
                         if(redRes.returnStatus==POStatus.STATUS_NULL)
                             continue;
@@ -182,8 +183,9 @@
                     }
                 }
                 
-                if(res.returnStatus==POStatus.STATUS_NULL)
+                if(res.returnStatus==POStatus.STATUS_NULL) {
                     return;
+                }
                 
                 if(res.returnStatus==POStatus.STATUS_ERR){
                     IOException ioe = new IOException("Packaging error while processing group");

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Wed Jul 16 08:38:29 2008
@@ -51,8 +51,7 @@
  * possible. The default is assumed to return an erroneus Result corresponding
  * to an unsupported operation on that type. So the operators need to implement
  * only those types that are supported.
- * 
- * @param <V>
+ *
  */
 public abstract class PhysicalOperator extends
         Operator<PhyPlanVisitor> {
@@ -211,9 +210,9 @@
         //Should be removed once the model is clear
         if(reporter!=null) reporter.progress();
             
-        if (!isInputAttached())
+        if (!isInputAttached()) {
             return inputs.get(0).getNext(inpValue);
-        else {
+        } else {
             res.result = input;
             res.returnStatus = POStatus.STATUS_OK;
             detachInput();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java Wed Jul 16 08:38:29 2008
@@ -126,10 +126,12 @@
                     processingPlan = false;
                     break;
                 }
-                if(res.returnStatus==POStatus.STATUS_ERR)
+                if(res.returnStatus==POStatus.STATUS_ERR) {
                     return res;
-                if(res.returnStatus==POStatus.STATUS_NULL)
+                }
+                if(res.returnStatus==POStatus.STATUS_NULL) {
                     continue;
+                }
             }
         }
         //The nested plan processing is done or is
@@ -138,10 +140,13 @@
         //read
         while (true) {
             inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+            if (inp.returnStatus == POStatus.STATUS_EOP ||
+                    inp.returnStatus == POStatus.STATUS_ERR) {
                 return inp;
-            if (inp.returnStatus == POStatus.STATUS_NULL)
+            }
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
                 continue;
+            }
             
             attachInputToPlans((Tuple) inp.result);
             
@@ -180,11 +185,13 @@
                 //Getting the iterators
                 //populate the input data
                 Result inputData = null;
-                Byte resultType = ((PhysicalOperator)planLeaves.get(i)).getResultType();
+                byte resultType = ((PhysicalOperator)planLeaves.get(i)).getResultType();
                 switch(resultType) {
-                case DataType.BAG : DataBag b = null;
-                inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(b);
-                break;
+                case DataType.BAG:
+                    DataBag b = null;
+                    inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(b);
+                    break;
+
                 case DataType.TUPLE : Tuple t = null;
                 inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(t);
                 break;
@@ -212,6 +219,12 @@
                 case DataType.CHARARRAY : String str = null;
                 inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(str);
                 break;
+
+                default:
+                    String msg = new String("Unknown type " +
+                        DataType.findTypeName(resultType));
+                    log.error(msg);
+                    throw new ExecException(msg);
                 }
                 
                 if(inputData.returnStatus == POStatus.STATUS_EOP) {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Wed Jul 16 08:38:29 2008
@@ -265,8 +265,12 @@
         if (it.hasNext()) {
             res.result = it.next();
             res.returnStatus = POStatus.STATUS_OK;
-        } else
+        } else {
             res.returnStatus = POStatus.STATUS_EOP;
+            inputsAccumulated = false;
+            sortedBag = null;
+            it = null;
+        }
 		return res;
 	}
 

Added: incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=677313&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java Wed Jul 16 08:38:29 2008
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+import junit.framework.TestCase;
+import junit.framework.Assert;
+
+import java.util.Iterator;
+import java.io.File;
+import java.io.IOException;
+import java.text.DecimalFormat;
+
+public class TestForEachNestedPlan extends TestCase {
+
+    private String initString = "mapreduce";
+    MiniCluster cluster = MiniCluster.buildCluster();
+
+    private PigServer pig ;
+
+    public TestForEachNestedPlan() throws Throwable {
+        pig = new PigServer(initString) ;
+    }
+
+    @Test
+    public void testInnerOrderBy() throws Exception {
+        File tmpFile = genDataSetFile1() ;
+        pig.registerQuery("a = load 'file:" + tmpFile + "'; ") ;
+        pig.registerQuery("b = group a by $0; ");
+        pig.registerQuery("c = foreach b { "
+                        + "     c1 = order $1 by *; "
+                        +  "    generate flatten(c1); "
+                        + "};") ;
+        Iterator<Tuple> it = pig.openIterator("c");
+        Tuple t = null ;
+        int count = 0 ;
+        while(it.hasNext()) {
+            t = it.next() ;
+            System.out.println(count + ":" + t) ;
+            count++ ;
+        }
+        Assert.assertEquals(count, 30);
+    }
+
+
+    /*
+    @Test
+    public void testInnerDistinct() throws Exception {
+        File tmpFile = genDataSetFile1() ;
+        pig.registerQuery("a = load 'file:" + tmpFile + "'; ") ;
+        pig.registerQuery("b = group a by $0; ");
+        pig.registerQuery("c = foreach b { "
+                        + "     c1 = distinct $1 ; "
+                        +  "    generate flatten(c1); "
+                        + "};") ;
+        Iterator<Tuple> it = pig.openIterator("c");
+        Tuple t = null ;
+        int count = 0 ;
+        while(it.hasNext()) {
+            t = it.next() ;
+            System.out.println(count + ":" + t) ;
+            count++ ;
+        }
+        Assert.assertEquals(count, 15);
+    }
+    */
+
+    /***
+     * For generating a sample dataset
+     */
+    private File genDataSetFile1() throws IOException {
+
+        int dataLength = 30;
+        String[][] data = new String[dataLength][] ;
+
+        DecimalFormat formatter = new DecimalFormat("0000000");
+
+        for (int i = 0; i < dataLength; i++) {
+            data[i] = new String[2] ;
+            data[i][0] = formatter.format(i % 10);
+            data[i][1] = formatter.format((dataLength - i)/2);
+        }
+
+        return TestHelper.createTempFile(data) ;
+    }
+}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java Wed Jul 16 08:38:29 2008
@@ -32,6 +32,7 @@
 import org.junit.Test;
 
 import org.apache.pig.PigServer;
+import org.apache.pig.test.utils.TestHelper;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -144,36 +145,6 @@
 
     //////////////////////// HELPERS ///////////////////////////////
 
-    /**
-     * Create temp file from a given dataset
-     * This assumes
-     *  1) The dataset has at least 1 record
-     *  2) All records are of the same size
-     */
-    private File createTempFile(String[][] data) throws IOException {
-
-        File fp1 = File.createTempFile("test", "txt");
-        PrintStream ps = new PrintStream(new FileOutputStream(fp1));
-
-        for(int i = 0; i < data.length ; i++) {
-
-            // Building up string for each line
-            StringBuilder sb = new StringBuilder() ;
-            for(int j = 0 ; j < data[0].length ; j++) {
-                if (j != 0) {
-                    sb.append("\t") ;
-                }
-                sb.append(data[i][j]) ;
-            }
-
-            // Write the line to file
-            ps.println(sb.toString());
-        }
-
-        ps.close();
-        return fp1 ;
-    }
-
     /***
      * Check if the given dataset is properly sorted
      * @param dataIter the dataset to be checked
@@ -265,7 +236,7 @@
             data[i][1] = formatter.format(dataLength - i - 1);
         }
 
-        return createTempFile(data) ;
+        return TestHelper.createTempFile(data) ;
     }
 
 
@@ -285,7 +256,7 @@
             data[i][1] = formatter.format(dataLength - i - 1);
         }
 
-        return createTempFile(data) ;
+        return TestHelper.createTempFile(data) ;
     }
 
     /***
@@ -304,6 +275,6 @@
             data[i][1] = formatter.format(i % 20);
         }
 
-        return createTempFile(data) ;
+        return TestHelper.createTempFile(data) ;
     }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Wed Jul 16 08:38:29 2008
@@ -17,8 +17,7 @@
  */
 package org.apache.pig.test.utils;
 
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
 import java.util.Iterator;
 import java.util.Random;
 
@@ -211,4 +210,34 @@
         }
         return ret;
     }
+
+       /**
+     * Create temp file from a given dataset
+     * This assumes
+     *  1) The dataset has at least 1 record
+     *  2) All records are of the same size
+     */
+    public static File createTempFile(String[][] data) throws IOException {
+
+        File fp1 = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(fp1));
+
+        for(int i = 0; i < data.length ; i++) {
+
+            // Building up string for each line
+            StringBuilder sb = new StringBuilder() ;
+            for(int j = 0 ; j < data[0].length ; j++) {
+                if (j != 0) {
+                    sb.append("\t") ;
+                }
+                sb.append(data[i][j]) ;
+            }
+
+            // Write the line to file
+            ps.println(sb.toString());
+        }
+
+        ps.close();
+        return fp1 ;
+    }
 }