You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pi...@apache.org on 2008/07/16 17:38:31 UTC
svn commit: r677313 - in /incubator/pig/branches/types:
src/org/apache/pig/impl/mapReduceLayer/
src/org/apache/pig/impl/physicalLayer/
src/org/apache/pig/impl/physicalLayer/relationalOperators/
test/org/apache/pig/test/ test/org/apache/pig/test/utils/
Author: pisong
Date: Wed Jul 16 08:38:29 2008
New Revision: 677313
URL: http://svn.apache.org/viewvc?rev=677313&view=rev
Log:
PIG-305 Fixed POSort to work inside ForEach
Added:
incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java
Modified:
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigMapReduce.java Wed Jul 16 08:38:29 2008
@@ -168,8 +168,9 @@
continue;
}
- if(redRes.returnStatus==POStatus.STATUS_EOP)
+ if(redRes.returnStatus==POStatus.STATUS_EOP) {
return;
+ }
if(redRes.returnStatus==POStatus.STATUS_NULL)
continue;
@@ -182,8 +183,9 @@
}
}
- if(res.returnStatus==POStatus.STATUS_NULL)
+ if(res.returnStatus==POStatus.STATUS_NULL) {
return;
+ }
if(res.returnStatus==POStatus.STATUS_ERR){
IOException ioe = new IOException("Packaging error while processing group");
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/PhysicalOperator.java Wed Jul 16 08:38:29 2008
@@ -51,8 +51,7 @@
* possible. The default is assumed to return an erroneus Result corresponding
* to an unsupported operation on that type. So the operators need to implement
* only those types that are supported.
- *
- * @param <V>
+ *
*/
public abstract class PhysicalOperator extends
Operator<PhyPlanVisitor> {
@@ -211,9 +210,9 @@
//Should be removed once the model is clear
if(reporter!=null) reporter.progress();
- if (!isInputAttached())
+ if (!isInputAttached()) {
return inputs.get(0).getNext(inpValue);
- else {
+ } else {
res.result = input;
res.returnStatus = POStatus.STATUS_OK;
detachInput();
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POForEach.java Wed Jul 16 08:38:29 2008
@@ -126,10 +126,12 @@
processingPlan = false;
break;
}
- if(res.returnStatus==POStatus.STATUS_ERR)
+ if(res.returnStatus==POStatus.STATUS_ERR) {
return res;
- if(res.returnStatus==POStatus.STATUS_NULL)
+ }
+ if(res.returnStatus==POStatus.STATUS_NULL) {
continue;
+ }
}
}
//The nested plan processing is done or is
@@ -138,10 +140,13 @@
//read
while (true) {
inp = processInput();
- if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ if (inp.returnStatus == POStatus.STATUS_EOP ||
+ inp.returnStatus == POStatus.STATUS_ERR) {
return inp;
- if (inp.returnStatus == POStatus.STATUS_NULL)
+ }
+ if (inp.returnStatus == POStatus.STATUS_NULL) {
continue;
+ }
attachInputToPlans((Tuple) inp.result);
@@ -180,11 +185,13 @@
//Getting the iterators
//populate the input data
Result inputData = null;
- Byte resultType = ((PhysicalOperator)planLeaves.get(i)).getResultType();
+ byte resultType = ((PhysicalOperator)planLeaves.get(i)).getResultType();
switch(resultType) {
- case DataType.BAG : DataBag b = null;
- inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(b);
- break;
+ case DataType.BAG:
+ DataBag b = null;
+ inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(b);
+ break;
+
case DataType.TUPLE : Tuple t = null;
inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(t);
break;
@@ -212,6 +219,12 @@
case DataType.CHARARRAY : String str = null;
inputData = ((PhysicalOperator)planLeaves.get(i)).getNext(str);
break;
+
+ default:
+ String msg = new String("Unknown type " +
+ DataType.findTypeName(resultType));
+ log.error(msg);
+ throw new ExecException(msg);
}
if(inputData.returnStatus == POStatus.STATUS_EOP) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/relationalOperators/POSort.java Wed Jul 16 08:38:29 2008
@@ -265,8 +265,12 @@
if (it.hasNext()) {
res.result = it.next();
res.returnStatus = POStatus.STATUS_OK;
- } else
+ } else {
res.returnStatus = POStatus.STATUS_EOP;
+ inputsAccumulated = false;
+ sortedBag = null;
+ it = null;
+ }
return res;
}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=677313&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlan.java Wed Jul 16 08:38:29 2008
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.data.Tuple;
+import org.junit.Test;
+import junit.framework.TestCase;
+import junit.framework.Assert;
+
+import java.util.Iterator;
+import java.io.File;
+import java.io.IOException;
+import java.text.DecimalFormat;
+
+public class TestForEachNestedPlan extends TestCase {
+
+ private String initString = "mapreduce";
+ MiniCluster cluster = MiniCluster.buildCluster();
+
+ private PigServer pig ;
+
+ public TestForEachNestedPlan() throws Throwable {
+ pig = new PigServer(initString) ;
+ }
+
+ @Test
+ public void testInnerOrderBy() throws Exception {
+ File tmpFile = genDataSetFile1() ;
+ pig.registerQuery("a = load 'file:" + tmpFile + "'; ") ;
+ pig.registerQuery("b = group a by $0; ");
+ pig.registerQuery("c = foreach b { "
+ + " c1 = order $1 by *; "
+ + " generate flatten(c1); "
+ + "};") ;
+ Iterator<Tuple> it = pig.openIterator("c");
+ Tuple t = null ;
+ int count = 0 ;
+ while(it.hasNext()) {
+ t = it.next() ;
+ System.out.println(count + ":" + t) ;
+ count++ ;
+ }
+ Assert.assertEquals(count, 30);
+ }
+
+
+ /*
+ @Test
+ public void testInnerDistinct() throws Exception {
+ File tmpFile = genDataSetFile1() ;
+ pig.registerQuery("a = load 'file:" + tmpFile + "'; ") ;
+ pig.registerQuery("b = group a by $0; ");
+ pig.registerQuery("c = foreach b { "
+ + " c1 = distinct $1 ; "
+ + " generate flatten(c1); "
+ + "};") ;
+ Iterator<Tuple> it = pig.openIterator("c");
+ Tuple t = null ;
+ int count = 0 ;
+ while(it.hasNext()) {
+ t = it.next() ;
+ System.out.println(count + ":" + t) ;
+ count++ ;
+ }
+ Assert.assertEquals(count, 15);
+ }
+ */
+
+ /***
+ * For generating a sample dataset
+ */
+ private File genDataSetFile1() throws IOException {
+
+ int dataLength = 30;
+ String[][] data = new String[dataLength][] ;
+
+ DecimalFormat formatter = new DecimalFormat("0000000");
+
+ for (int i = 0; i < dataLength; i++) {
+ data[i] = new String[2] ;
+ data[i][0] = formatter.format(i % 10);
+ data[i][1] = formatter.format((dataLength - i)/2);
+ }
+
+ return TestHelper.createTempFile(data) ;
+ }
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy2.java Wed Jul 16 08:38:29 2008
@@ -32,6 +32,7 @@
import org.junit.Test;
import org.apache.pig.PigServer;
+import org.apache.pig.test.utils.TestHelper;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -144,36 +145,6 @@
//////////////////////// HELPERS ///////////////////////////////
- /**
- * Create temp file from a given dataset
- * This assumes
- * 1) The dataset has at least 1 record
- * 2) All records are of the same size
- */
- private File createTempFile(String[][] data) throws IOException {
-
- File fp1 = File.createTempFile("test", "txt");
- PrintStream ps = new PrintStream(new FileOutputStream(fp1));
-
- for(int i = 0; i < data.length ; i++) {
-
- // Building up string for each line
- StringBuilder sb = new StringBuilder() ;
- for(int j = 0 ; j < data[0].length ; j++) {
- if (j != 0) {
- sb.append("\t") ;
- }
- sb.append(data[i][j]) ;
- }
-
- // Write the line to file
- ps.println(sb.toString());
- }
-
- ps.close();
- return fp1 ;
- }
-
/***
* Check if the given dataset is properly sorted
* @param dataIter the dataset to be checked
@@ -265,7 +236,7 @@
data[i][1] = formatter.format(dataLength - i - 1);
}
- return createTempFile(data) ;
+ return TestHelper.createTempFile(data) ;
}
@@ -285,7 +256,7 @@
data[i][1] = formatter.format(dataLength - i - 1);
}
- return createTempFile(data) ;
+ return TestHelper.createTempFile(data) ;
}
/***
@@ -304,6 +275,6 @@
data[i][1] = formatter.format(i % 20);
}
- return createTempFile(data) ;
+ return TestHelper.createTempFile(data) ;
}
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=677313&r1=677312&r2=677313&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Wed Jul 16 08:38:29 2008
@@ -17,8 +17,7 @@
*/
package org.apache.pig.test.utils;
-import java.io.IOException;
-import java.io.InputStream;
+import java.io.*;
import java.util.Iterator;
import java.util.Random;
@@ -211,4 +210,34 @@
}
return ret;
}
+
+ /**
+ * Create temp file from a given dataset
+ * This assumes
+ * 1) The dataset has at least 1 record
+ * 2) All records are of the same size
+ */
+ public static File createTempFile(String[][] data) throws IOException {
+
+ File fp1 = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(fp1));
+
+ for(int i = 0; i < data.length ; i++) {
+
+ // Building up string for each line
+ StringBuilder sb = new StringBuilder() ;
+ for(int j = 0 ; j < data[0].length ; j++) {
+ if (j != 0) {
+ sb.append("\t") ;
+ }
+ sb.append(data[i][j]) ;
+ }
+
+ // Write the line to file
+ ps.println(sb.toString());
+ }
+
+ ps.close();
+ return fp1 ;
+ }
}