You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/09/15 19:51:30 UTC
svn commit: r1171195 [2/2] - in /pig/trunk: ./ conf/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/
src/org/apache/pig/backend/hadoop/executi...
Added: pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,307 @@
+/*
+¯ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.builtin.IntSum;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.parser.ParserException;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test POPartialAgg runtime
+ */
+public class TestPOPartialAgg {
+ POPartialAgg partAggOp;
+ PhysicalPlan parentPlan;
+ Tuple dummyTuple = null;
+
+ @Before
+ public void setUp() throws Exception {
+ createPOPartialPlan();
+ }
+
+ private void createPOPartialPlan() throws PlanException {
+ parentPlan = new PhysicalPlan();
+ partAggOp = GenPhyOp.topPOPartialAgg();
+ partAggOp.setParentPlan(parentPlan);
+
+ // setup key plan
+ PhysicalPlan keyPlan = new PhysicalPlan();
+ POProject keyProj = new POProject(GenPhyOp.getOK(), -1, 0);
+ keyProj.setResultType(DataType.INTEGER);
+ keyPlan.add(keyProj);
+ partAggOp.setKeyPlan(keyPlan);
+
+ // setup value plan
+ // project arg for udf
+ PhysicalPlan valPlan1 = new PhysicalPlan();
+ POProject projVal1 = new POProject(GenPhyOp.getOK(), -1, 1);
+ projVal1.setResultType(DataType.BAG);
+ valPlan1.add(projVal1);
+
+ // setup udf
+ List<PhysicalOperator> udfInps = new ArrayList<PhysicalOperator>();
+ udfInps.add(projVal1);
+ FuncSpec sumSpec = new FuncSpec(IntSum.Intermediate.class.getName());
+ POUserFunc sumUdf = new POUserFunc(GenPhyOp.getOK(), -1, udfInps,
+ sumSpec);
+ valPlan1.add(sumUdf);
+ valPlan1.connect(projVal1, sumUdf);
+
+ List<PhysicalPlan> valuePlans = new ArrayList<PhysicalPlan>();
+ valuePlans.add(valPlan1);
+
+ partAggOp.setValuePlans(valuePlans);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testPartialOneInput1() throws ExecException, ParserException {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] tups1 = { "(1,(2L))" };
+ Tuple t = Util.getTuplesFromConstantTupleStrings(tups1).get(0);
+ checkSingleRow(t);
+ }
+
+ @Test
+ public void testPartialOneInput2() throws ExecException, ParserException {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] tups1 = { "(null,(2L))" };
+ Tuple t = Util.getTuplesFromConstantTupleStrings(tups1).get(0);
+ checkSingleRow(t);
+ }
+
+ @Test
+ public void testPartialOneInput3() throws ExecException, ParserException {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] tups1 = { "(1,(null))" };
+ Tuple t = Util.getTuplesFromConstantTupleStrings(tups1).get(0);
+ checkSingleRow(t);
+ }
+
+ private void checkSingleRow(Tuple t) throws ExecException {
+ Result res;
+ // attaching one input tuple, result tuple stays in operator, expect EOP
+ partAggOp.attachInput(t);
+ res = partAggOp.getNext(dummyTuple);
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+
+ // end of all input, now expecting results
+ parentPlan.endOfAllInput = true;
+ res = partAggOp.getNext(dummyTuple);
+ assertEquals(POStatus.STATUS_OK, res.returnStatus);
+ assertEquals(t, res.result);
+ }
+
+ @Test
+ public void testPartialAggNoInput() throws ExecException, ParserException {
+
+ // nothing attached, expecting EOP
+ Result res = partAggOp.getNext(dummyTuple);
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+
+ // end of all input, still no results
+ parentPlan.endOfAllInput = true;
+ res = partAggOp.getNext(dummyTuple);
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+
+ }
+
+ @Test
+ public void testPartialMultiInput1() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(1,(1L))", "(1,(2L))", "(2,(1L))" };
+ String[] outputTups = { "(1,(3L))", "(2,(1L))" };
+ checkInputAndOutput(inputTups, outputTups, false);
+ }
+
+ @Test
+ public void testPartialMultiInput2() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
+ String[] outputTups = { "(1,(3L))", "(2,(2L))" };
+ checkInputAndOutput(inputTups, outputTups, false);
+ }
+
+ @Test
+ public void testPartialMultiInput3() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(null,(1L))", "(null,(2L))", "(null,(2L))" };
+ String[] outputTups = { "(null,(5L))" };
+ checkInputAndOutput(inputTups, outputTups, false);
+ }
+
+ @Test
+ public void testPartialMultiInput4() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
+ String[] outputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
+ checkInputAndOutput(inputTups, outputTups, false);
+ }
+
+ @Test
+ // The case where there is no memory for use by hashmap
+ public void testPartialMultiInputHashMemEmpty1() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
+ String[] outputTups = { "(1,(1L))", "(2,(2L))", "(null,(2L))" };
+ checkInputAndOutput(inputTups, outputTups, true);
+ }
+
+ @Test
+ // The case where there is no memory for use by hashmap
+ public void testPartialMultiInputHashMemEmpty2() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
+ // since the group keys with same value are not in consecutive rows
+ // and hashmap is not given any memory they don't get
+ // aggreated with POPartialAgg
+ String[] outputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
+ checkInputAndOutput(inputTups, outputTups, true);
+ }
+
+ @Test
+ public void testPartialMultiInput1HashMemEmpty() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ // gby keys in consecutive row, they get aggregated even when
+ // hashmap is not given any memory
+ String[] inputTups = { "(1,(1L))", "(1,(2L))", "(2,(1L))" };
+ String[] outputTups = { "(1,(3L))", "(2,(1L))" };
+ checkInputAndOutput(inputTups, outputTups, true);
+ }
+
+ @Test
+ public void testMultiInput1HashMemEmpty() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(1,(1L))", "(2,(2L))", "(1,(2L))" };
+ String[] outputTups = { "(1,(3L))", "(2,(2L))" };
+ checkInputAndOutput(inputTups, outputTups, false);
+ }
+
+ @Test
+ public void testPartialMultiInputMultiInput1HashMemEmpty() throws Exception {
+ // input tuple has key, and bag containing SUM.Init output
+ String[] inputTups = { "(null,(1L))", "(null,(2L))", "(null,(2L))" };
+ String[] outputTups = { "(null,(5L))" };
+ checkInputAndOutput(inputTups, outputTups, false);
+ }
+
+
+ /**
+ * run the plan on inputTups and check if output matches outputTups if
+ * isMapMemEmpty is set to true, set memory available for the hash-map to
+ * zero
+ *
+ * @param inputTups
+ * @param outputTups
+ * @param isMapMemEmpty
+ * @throws ParserException
+ * @throws ExecException
+ * @throws PlanException
+ */
+ private void checkInputAndOutput(String[] inputTups, String[] outputTups,
+ boolean isMapMemEmpty) throws Exception {
+
+ PigMapReduce.sJobConfInternal.set(new Configuration());
+ if (isMapMemEmpty) {
+ PigMapReduce.sJobConfInternal.get().set("pig.cachedbag.memusage",
+ "0");
+ }
+
+ List<Tuple> inputs = Util.getTuplesFromConstantTupleStrings(inputTups);
+ List<Tuple> expectedOuts = Util
+ .getTuplesFromConstantTupleStrings(outputTups);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+
+ // run through the inputs
+ for (Tuple t : inputs) {
+ Result res;
+ // attaching one input tuple, result tuple stays in operator, expect
+ // EOP
+ partAggOp.attachInput(t);
+ res = partAggOp.getNext(dummyTuple);
+ if (isMapMemEmpty) {
+ addResults(res, outputs);
+ } else {
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ }
+ }
+
+ // start getting the outputs
+
+ // end of all input, now expecting results
+ parentPlan.endOfAllInput = true;
+
+ if (isMapMemEmpty) {
+ Result res = partAggOp.getNext(dummyTuple);
+ // only one last output expected
+ addResults(res, outputs);
+
+ res = partAggOp.getNext(dummyTuple);
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ Util.compareActualAndExpectedResults(outputs, expectedOuts);
+ } else {
+ while (true) {
+ Result res = partAggOp.getNext(dummyTuple);
+ if (!addResults(res, outputs)) {
+ break;
+ }
+ }
+ Util.compareActualAndExpectedResults(outputs, expectedOuts);
+ }
+
+ }
+
+ private boolean addResults(Result res, List<Tuple> outputs) {
+ if (res.returnStatus == POStatus.STATUS_EOP) {
+ return false;
+ } else if (res.returnStatus == POStatus.STATUS_OK) {
+ outputs.add((Tuple) res.result);
+ return true;
+ } else {
+ fail("Invalid result status " + res.returnStatus);
+ return false; // to keep compiler happy
+ }
+ }
+
+}
Added: pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java?rev=1171195&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAggPlan.java Thu Sep 15 17:51:27 2011
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.impl.PigContext;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test POPartialAgg runtime
+ */
+public class TestPOPartialAggPlan {
+ PigContext pc;
+
+ @Before
+ public void setUp() throws ExecException {
+ pc = new PigContext(ExecType.LOCAL, new Properties());
+ pc.connect();
+ }
+
+ @Test
+ public void testNoMapAggProp() throws Exception{
+ //test with pig.exec.mapPartAgg not set
+ String query = getGByQuery();
+
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp));
+ }
+
+ @Test
+ public void testMapAggPropFalse() throws Exception{
+ //test with pig.exec.mapPartAgg set to false
+ String query = getGByQuery();
+ pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "false");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
+ }
+
+ @Test
+ public void testMapAggPropTrue() throws Exception{
+ //test with pig.exec.mapPartAgg to true
+ String query = getGByQuery();
+ pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "true");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNotNull("POPartialAgg should be present",findPOPartialAgg(mrp));
+
+ }
+
+
+ private Object findPOPartialAgg(MROperPlan mrp) {
+ PhysicalPlan mapPlan = mrp.getRoots().get(0).mapPlan;
+ return findPOPartialAgg(mapPlan);
+ }
+
+ private String getGByQuery() {
+ return "l = load 'x' as (a,b,c);" +
+ "g = group l by a;" +
+ "f = foreach g generate group, COUNT(l.b);";
+ }
+
+
+ @Test
+ public void testMapAggNoAggFunc() throws Exception{
+ //no agg func, so there should not be a POPartial
+ String query = "l = load 'x' as (a,b,c);" +
+ "g = group l by a;" +
+ "f = foreach g generate group;";
+ pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "true");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent",findPOPartialAgg(mrp));
+ }
+
+ @Test
+ public void testMapAggNotCombinable() throws Exception{
+ //not combinable, so there should not be a POPartial
+ String query = "l = load 'x' as (a,b,c);" +
+ "g = group l by a;" +
+ "f = foreach g generate group, COUNT(l.b), l.b;";
+ pc.getProperties().setProperty(MapReduceLauncher.PROP_EXEC_MAP_PARTAGG, "true");
+ MROperPlan mrp = Util.buildMRPlan(query, pc);
+ assertEquals(mrp.size(), 1);
+
+ assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
+ }
+
+ private PhysicalOperator findPOPartialAgg(PhysicalPlan mapPlan) {
+ Iterator<PhysicalOperator> it = mapPlan.iterator();
+ while(it.hasNext()){
+ PhysicalOperator op = (PhysicalOperator) it.next();
+ if(op instanceof POPartialAgg){
+ return (POPartialAgg)op;
+ }
+ }
+ return null;
+ }
+
+
+
+}
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Sep 15 17:51:27 2011
@@ -64,8 +64,10 @@ import org.apache.pig.backend.executione
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
@@ -82,6 +84,7 @@ import org.apache.pig.impl.plan.Compilat
import org.apache.pig.impl.util.LogUtils;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
+import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.optimizer.UidResetter;
@@ -457,14 +460,23 @@ public class Util {
actualResList.add(actualResultsIt.next());
}
+ compareActualAndExpectedResults(actualResList, expectedResList);
+
+ }
+
+
+
+ static public void compareActualAndExpectedResults(
+ List<Tuple> actualResList, List<Tuple> expectedResList) {
Collections.sort(actualResList);
Collections.sort(expectedResList);
Assert.assertEquals("Comparing actual and expected results. ",
expectedResList, actualResList);
- }
+
+ }
- /**
+ /**
* Check if subStr is a subString of str . calls org.junit.Assert.fail if it is not
* @param str
* @param subStr
@@ -770,6 +782,14 @@ public class Util {
return (MROperPlan) compile.invoke(launcher, new Object[] { pp, pc });
}
+ public static MROperPlan buildMRPlan(String query, PigContext pc) throws Exception {
+ LogicalPlan lp = Util.parse(query, pc);
+ Util.optimizeNewLP(lp);
+ PhysicalPlan pp = Util.buildPhysicalPlanFromNewLP(lp, pc);
+ MROperPlan mrp = Util.buildMRPlanWithOptimizer(pp, pc);
+ return mrp;
+ }
+
public static void registerMultiLineQuery(PigServer pigServer, String query) throws IOException {
File f = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f);
@@ -1018,5 +1038,7 @@ public class Util {
}
}
+
+
}
Modified: pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=1171195&r1=1171194&r2=1171195&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Thu Sep 15 17:51:27 2011
@@ -186,6 +186,11 @@ public class GenPhyOp{
return ret;
}
+ public static POPartialAgg topPOPartialAgg(){
+ POPartialAgg partAgg = new POPartialAgg(getOK());
+ return partAgg;
+ }
+
/**
* creates the PlansAndFlattens struct for
* generate grpCol, *.
@@ -899,7 +904,7 @@ public class GenPhyOp{
return ep;
}
- private static OperatorKey getOK(){
+ public static OperatorKey getOK(){
return new OperatorKey("",r.nextLong());
}