You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/16 03:09:32 UTC
svn commit: r656913 [2/3] - in /incubator/pig/branches/types: ./
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/
src/org/apache/pig/impl/builtin/ src/org/apache/pig/impl/mapReduceLayer...
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/expressionOperators/POUserFunc.java Thu May 15 18:09:30 2008
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POUserFunc extends ExpressionOperator {
+
+ transient EvalFunc func;
+ Tuple t1, t2;
+ private final Log log = LogFactory.getLog(getClass());
+ String funcSpec;
+ private final byte INITIAL = 0;
+ private final byte INTERMEDIATE = 1;
+ private final byte FINAL = 2;
+
+ public POUserFunc(OperatorKey k, int rp, List inp) {
+ super(k, rp);
+ inputs = inp;
+
+ }
+
+ public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+ this(k, rp, inp, funcSpec, null);
+
+ instantiateFunc();
+ }
+
+ public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec, EvalFunc func) {
+// super(k, rp, inp);
+ super(k, rp);
+ super.setInputs(inp);
+ this.funcSpec = funcSpec;
+ this.func = func;
+ }
+
+ private void instantiateFunc() {
+ this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+ }
+
+ private Result getNext() throws ExecException {
+ Tuple t = null;
+ Result result = new Result();
+ // instantiate the function if its null
+ if (func == null)
+ instantiateFunc();
+
+ try {
+ if (inputAttached) {
+ result.result = func.exec(input);
+ result.returnStatus = (result.result != null) ? POStatus.STATUS_OK
+ : POStatus.STATUS_EOP;
+ return result;
+ } else {
+ Result in = inputs.get(0).getNext(t);
+ if (in.returnStatus == POStatus.STATUS_EOP) {
+ result.returnStatus = POStatus.STATUS_EOP;
+ return result;
+ }
+ result.result = func.exec((Tuple) in.result);
+ result.returnStatus = POStatus.STATUS_OK;
+ return result;
+ }
+ } catch (IOException e) {
+ log.error(e);
+ //throw new ExecException(e.getCause());
+ }
+ result.returnStatus = POStatus.STATUS_ERR;
+ return result;
+ }
+
+ @Override
+ public Result getNext(Tuple tIn) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+
+ return getNext();
+ }
+
+ public void setAlgebraicFunction(Byte Function) {
+ // This will only be used by the optimizer for putting correct functions
+ // in the mapper,
+ // combiner and reduce. This helps in maintaining the physical plan as
+ // is without the
+ // optimiser having to replace any operators.
+ // You wouldn't be able to make two calls to this function on the same
+ // algebraic EvalFunc as
+ // func is being changed.
+ switch (Function) {
+ case INITIAL:
+ func = (EvalFunc) PigContext.instantiateFuncFromSpec(getInitial());
+ setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ break;
+ case INTERMEDIATE:
+ func = (EvalFunc) PigContext.instantiateFuncFromSpec(getIntermed());
+ setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ break;
+ case FINAL:
+ func = (EvalFunc) PigContext.instantiateFuncFromSpec(getFinal());
+ setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ break;
+
+ }
+ }
+
+ public String getInitial() {
+ if (func == null)
+ instantiateFunc();
+
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getInitial();
+ } else {
+ log
+ .error("Attempt to run a non-algebraic function as an algebraic function");
+ }
+ return null;
+ }
+
+ public String getIntermed() {
+ if (func == null)
+ instantiateFunc();
+
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getIntermed();
+ } else {
+ log
+ .error("Attempt to run a non-algebraic function as an algebraic function");
+ }
+ return null;
+ }
+
+ public String getFinal() {
+ if (func == null)
+ instantiateFunc();
+
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getFinal();
+ } else {
+ log
+ .error("Attempt to run a non-algebraic function as an algebraic function");
+ }
+ return null;
+ }
+
+ public Type getReturnType() {
+ if (func == null)
+ instantiateFunc();
+
+ return func.getReturnType();
+ }
+
+ public void finish() {
+ if (func == null)
+ instantiateFunc();
+
+ func.finish();
+ }
+
+ public Schema outputSchema(Schema input) {
+ if (func == null)
+ instantiateFunc();
+
+ return func.outputSchema(input);
+ }
+
+ public Boolean isAsynchronous() {
+ if (func == null)
+ instantiateFunc();
+
+ return func.isAsynchronous();
+ }
+
+ @Override
+ public String name() {
+ if(funcSpec!=null)
+ return "POUserFunc" + "(" + funcSpec + ")" + " - " + mKey.toString();
+ else
+ return "POUserFunc" + "(" + "DummySpec" + ")" + " - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+
+ return false;
+ }
+
+ @Override
+ public void visit(ExprPlanVisitor v) throws VisitorException {
+
+ v.visitUserFunc(this);
+ }
+
+ public String getFuncSpec() {
+ return funcSpec;
+ }
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/PlanWalker.java Thu May 15 18:09:30 2008
@@ -56,9 +56,14 @@
*/
public abstract PlanWalker<O, P> spawnChildWalker(P plan);
- public P getPlan() {
+ public P getPlan() {
return mPlan ;
}
+
+ public void setPlan(P plan) {
+ mPlan = plan;
+ }
+
}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java Thu May 15 18:09:30 2008
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.impl.mapReduceLayer.MRCompiler;
+import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestJobSubmission extends junit.framework.TestCase{
+
+
+ static PigContext pc;
+ String ldFile;
+ String expFile;
+ PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
+ String stFile;
+ String hadoopLdFile;
+ String grpName;
+ Random r = new Random();
+ String curDir;
+ String inpDir;
+ String golDir;
+
+ static {
+ MiniCluster cluster = MiniCluster.buildCluster();
+ pc = new PigContext();
+ try {
+ pc.connect();
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ GenPhyOp.setPc(pc);
+
+ }
+ @Before
+ public void setUp() throws Exception{
+ curDir = System.getProperty("user.dir");
+ inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
+ golDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/GoldenFiles/";
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ private void generateInput(int numTuples) throws ExecException{
+
+ DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
+
+ POProject proj = new POProject(new OperatorKey("", r.nextLong()));
+ Tuple t = new DefaultTuple();
+ t.append(inpDb);
+ proj.attachInput(t);
+ proj.setColumn(0);
+ proj.setOverloaded(true);
+ proj.setResultType(DataType.TUPLE);
+
+ List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
+ inps.add(proj);
+
+ POStore str = new POStore(new OperatorKey("", r.nextLong()));
+ str.setInputs(inps);
+
+ FileSpec fSpec = new FileSpec(ldFile,PigStorage.class.getName());
+
+ str.setSFile(fSpec);
+ str.setPc(pc);
+ str.store();
+ }
+
+ private void setUp1(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst1.txt";
+ expFile = ldFile;
+ stFile = "jsTst1";
+ grpName = "jobSubTst1";
+
+ if(gen){
+ generateInput(100);
+ return;
+ }
+
+ hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+ FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ php.add(ld);
+ php.add(st);
+ php.connect(ld, st);
+ }
+
+// @Test
+ public void testCompile1() throws Exception {
+ boolean gen = false;
+
+ setUp1(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+ }
+
+ private void setUp2(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst2.txt";
+ expFile = ldFile;
+ stFile = "jsTst2";
+ grpName = "jobSubTst2";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+ FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ php.add(ld);
+ php.add(st);
+ php.connect(ld, st);
+ }
+
+// @Test
+ public void testCompile2() throws Exception {
+ boolean gen = false;
+
+ setUp2(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+ }
+
+ private void setUp3(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst1.txt";
+ expFile = "file:" + golDir + "jsTst3";
+ stFile = "jsTst3";
+ grpName = "jobSubTst3";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+ FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ int[] flds = {0,1};
+ Tuple sample = new DefaultTuple();
+ sample.append(new String("S"));
+ sample.append(new Integer("10"));
+
+ POForEach fe = GenPhyOp.topForEachOPWithPlan(flds , sample);
+
+ POFilter fl = GenPhyOp.topFilterOpWithProj(1, 500, GenPhyOp.LT);
+
+ php.add(ld);
+ php.add(fe);
+ php.connect(ld, fe);
+
+ php.add(fl);
+ php.connect(fe, fl);
+
+ php.add(st);
+ php.connect(fl, st);
+ }
+
+// @Test
+ public void testCompile3() throws Exception {
+ boolean gen = false;
+
+ setUp3(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+ }
+
+ private void setUp4(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst1.txt";
+ expFile = "file:" + golDir + "jsTst4";
+ stFile = "jsTst4";
+ grpName = "jobSubTst4";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+ FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ POSplit spl = GenPhyOp.topSplitOp();
+ POFilter fl1 = GenPhyOp.topFilterOpWithProjWithCast(1, 200, GenPhyOp.LT);
+ POFilter fl2 = GenPhyOp.topFilterOpWithProjWithCast(1, 800, GenPhyOp.GT);
+
+ POUnion un = GenPhyOp.topUnionOp();
+
+ php.add(ld);
+ php.add(spl);
+ php.connect(ld, spl);
+
+ php.add(fl1);
+ php.connect(spl, fl1);
+
+ php.add(fl2);
+ php.connect(spl, fl2);
+
+ php.add(un);
+ php.connect(fl1, un);
+ php.connect(fl2, un);
+
+ php.add(st);
+ php.connect(un, st);
+ }
+
+// @Test
+ public void testCompile4() throws Exception {
+ boolean gen = false;
+
+ setUp4(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+
+ }
+
+ private void setUp5(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst5.txt";
+ expFile = ldFile;
+ stFile = "jsTst5";
+ grpName = "jobSubTst5";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ hadoopLdFile = FileLocalizer.hadoopify(ldFile, pc);
+
+ FileSpec LFSpec = new FileSpec(hadoopLdFile,PigStorage.class.getName()+"(',')");
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ Tuple sample = new DefaultTuple();
+ sample.append("S");
+ sample.append(1);
+ POLocalRearrange lr = GenPhyOp.topLocalRearrangeOPWithPlan(0, 1, sample);
+
+ POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+
+ POPackage pk = GenPhyOp.topPackageOp();
+ pk.setKeyType(DataType.INTEGER);
+ pk.setNumInps(1);
+ boolean[] inner = {false};
+ pk.setInner(inner);
+
+ POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
+
+ php.add(ld);
+ php.add(lr);
+ php.connect(ld, lr);
+
+ php.add(gr);
+ php.connect(lr, gr);
+
+ php.add(pk);
+ php.connect(gr, pk);
+
+ php.add(fe);
+ php.connect(pk, fe);
+
+ php.add(st);
+ php.connect(fe, st);
+ }
+
+ @Test
+ public void testCompile5() throws Exception {
+ boolean gen = false;
+
+ setUp5(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+
+ }
+
+ private void submit() throws Exception{
+ assertEquals(true, FileLocalizer.fileExists(hadoopLdFile, pc));
+ MapReduceLauncher mrl = new MapReduceLauncher();
+ mrl.launchPig(php, grpName, pc);
+ }
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java?rev=656913&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalJobSubmission.java Thu May 15 18:09:30 2008
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.jobcontrol.Job;
+import org.apache.hadoop.mapred.jobcontrol.JobControl;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.impl.mapReduceLayer.LocalLauncher;
+import org.apache.pig.impl.mapReduceLayer.MRCompiler;
+import org.apache.pig.impl.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POCast;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.ComparisonOperator;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestLocalJobSubmission extends junit.framework.TestCase{
+ static PigContext pc;
+ String ldFile;
+ String expFile;
+ PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
+ String stFile;
+ String grpName;
+ String curDir;
+ String outDir;
+ String inpDir;
+ String golDir;
+ Random r = new Random();
+
+ static {
+// MiniCluster cluster = MiniCluster.buildCluster();
+ pc = new PigContext();
+ try {
+ pc.connect();
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ GenPhyOp.setPc(pc);
+ }
+
+
+
+ /*@Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ if(outDir!=null && new File(outDir).exists())
+ rmrf(outDir);
+ }*/
+
+ @Before
+ public void setUp() throws Exception{
+ curDir = System.getProperty("user.dir");
+ inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
+ outDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/OutputFiles/";
+ golDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/GoldenFiles/";
+ File f = new File(outDir);
+ boolean didMakeDir = f.mkdirs();
+ /*if(!didMakeDir)
+ throw new Exception("Could not Create Directory " + outDir);*/
+ }
+
+ private void rmrf(String dir) throws Exception{
+ File f = new File(dir);
+ File[] ls = f.listFiles();
+ for (File file : ls) {
+ if(file.isFile())
+ file.delete();
+ else
+ rmrf(file.getPath());
+ }
+ boolean isDeleted = f.delete();
+ if(!isDeleted)
+ throw new Exception("Could not Delete Directory" + dir);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ rmrf(outDir);
+ }
+
+ private void generateInput(int numTuples) throws ExecException{
+
+ DataBag inpDb = GenRandomData.genRandSmallTupDataBag(r, numTuples, 1000);
+
+ POProject proj = new POProject(new OperatorKey("", r.nextLong()));
+ Tuple t = new DefaultTuple();
+ t.append(inpDb);
+ proj.attachInput(t);
+ proj.setColumn(0);
+ proj.setOverloaded(true);
+ proj.setResultType(DataType.TUPLE);
+
+ List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
+ inps.add(proj);
+
+ POStore str = new POStore(new OperatorKey("", r.nextLong()));
+ str.setInputs(inps);
+
+ FileSpec fSpec = new FileSpec(ldFile,PigStorage.class.getName());
+
+ str.setSFile(fSpec);
+ str.setPc(pc);
+ str.store();
+ }
+
+ private void setUp1(boolean gen) throws Exception {
+
+ ldFile = "file:" + inpDir + "jsTst1.txt";
+ expFile = ldFile;
+ stFile = "file:" + outDir + "jsTst1";
+ grpName = "jobSubTst1";
+
+ if(gen){
+ generateInput(100);
+ return;
+ }
+
+ FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ php.add(ld);
+ php.add(st);
+ php.connect(ld, st);
+ }
+
+ @Test
+ public void testCompile1() throws Exception {
+ boolean gen = false;
+
+ setUp1(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+ }
+
+ private void setUp2(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst2.txt";
+ expFile = ldFile;
+ stFile = "file:" + outDir + "jsTst2";
+ grpName = "jobSubTst2";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ php.add(ld);
+ php.add(st);
+ php.connect(ld, st);
+ }
+
+ @Test
+ public void testCompile2() throws Exception {
+ boolean gen = false;
+
+ setUp2(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName());
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+ }
+
+ private void setUp3(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst1.txt";
+ expFile = "file:" + golDir + "jsTst3";
+ stFile = "file:" + outDir + "jsTst3";
+ grpName = "jobSubTst3";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ int[] flds = {0,1};
+ Tuple sample = new DefaultTuple();
+ sample.append(new String("S"));
+ sample.append(new Integer("10"));
+
+ POForEach fe = GenPhyOp.topForEachOPWithPlan(flds , sample);
+
+ POFilter fl = GenPhyOp.topFilterOpWithProj(1, 500, GenPhyOp.LT);
+
+ php.add(ld);
+ php.add(fe);
+ php.connect(ld, fe);
+
+ php.add(fl);
+ php.connect(fe, fl);
+
+ php.add(st);
+ php.connect(fl, st);
+ }
+
+ @Test
+ public void testCompile3() throws Exception {
+ boolean gen = false;
+
+ setUp3(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+ }
+
+ private void setUp4(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst1.txt";
+ expFile = "file:" + golDir + "jsTst4";
+ stFile = "file:" + outDir + "jsTst4";
+ grpName = "jobSubTst4";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName());
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ POSplit spl = GenPhyOp.topSplitOp();
+ POFilter fl1 = GenPhyOp.topFilterOpWithProjWithCast(1, 200, GenPhyOp.LT);
+ POFilter fl2 = GenPhyOp.topFilterOpWithProjWithCast(1, 800, GenPhyOp.GT);
+
+ POUnion un = GenPhyOp.topUnionOp();
+
+ php.add(ld);
+ php.add(spl);
+ php.connect(ld, spl);
+
+ php.add(fl1);
+ php.connect(spl, fl1);
+
+ php.add(fl2);
+ php.connect(spl, fl2);
+
+ php.add(un);
+ php.connect(fl1, un);
+ php.connect(fl2, un);
+
+ php.add(st);
+ php.connect(un, st);
+ }
+
+ @Test
+ public void testCompile4() throws Exception {
+ boolean gen = false;
+
+ setUp4(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+
+ }
+
+ private void setUp5(boolean gen) throws Exception {
+ ldFile = "file:" + inpDir + "jsTst5.txt";
+ expFile = ldFile;
+ stFile = "file:" + outDir + "jsTst5";
+ grpName = "jobSubTst5";
+
+ if(gen){
+ generateInput(1000);
+ return;
+ }
+
+ FileSpec LFSpec = new FileSpec(ldFile,PigStorage.class.getName()+"(',')");
+ FileSpec SFSpec = new FileSpec(stFile,PigStorage.class.getName());
+
+ POLoad ld = new POLoad(new OperatorKey("", r.nextLong()));
+ POStore st = new POStore(new OperatorKey("", r.nextLong()));
+ ld.setPc(pc);
+ ld.setLFile(LFSpec);
+ st.setPc(pc);
+ st.setSFile(SFSpec);
+
+ Tuple sample = new DefaultTuple();
+ sample.append("S");
+ sample.append(1);
+ POLocalRearrange lr = GenPhyOp.topLocalRearrangeOPWithPlan(0, 1, sample);
+
+ POGlobalRearrange gr = GenPhyOp.topGlobalRearrangeOp();
+
+ POPackage pk = GenPhyOp.topPackageOp();
+ pk.setKeyType(DataType.INTEGER);
+ pk.setNumInps(1);
+ boolean[] inner = {false};
+ pk.setInner(inner);
+
+ POForEach fe = GenPhyOp.topForEachOPWithPlan(1);
+
+ php.add(ld);
+ php.add(lr);
+ php.connect(ld, lr);
+
+ php.add(gr);
+ php.connect(lr, gr);
+
+ php.add(pk);
+ php.connect(gr, pk);
+
+ php.add(fe);
+ php.connect(pk, fe);
+
+ php.add(st);
+ php.connect(fe, st);
+ }
+
+ @Test
+ public void testCompile5() throws Exception {
+ boolean gen = false;
+
+ setUp5(gen);
+
+ if(gen)
+ return;
+
+ submit();
+
+ assertEquals(true, FileLocalizer.fileExists(stFile, pc));
+
+ FileSpec fSpecExp = new FileSpec(expFile,PigStorage.class.getName()+"(',')");
+ FileSpec fSpecAct = new FileSpec(stFile,PigStorage.class.getName());
+
+ assertEquals(true, TestHelper.areFilesSame(fSpecExp, fSpecAct, pc));
+
+ }
+
+ private void submit() throws Exception{
+ assertEquals(true, FileLocalizer.fileExists(ldFile, pc));
+ LocalLauncher ll = new LocalLauncher();
+ ll.launchPig(php, grpName, pc);
+ }
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu May 15 18:09:30 2008
@@ -30,11 +30,15 @@
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.IndexedTuple;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.physicalLayer.POStatus;
import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
@@ -58,6 +62,9 @@
public void setUp() throws Exception {
Random r = new Random();
db = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+ }
+
+ private void setUp1() throws PlanException, ExecException{
lr = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0,0,db.iterator().next());
POProject proj = GenPhyOp.exprProject();
proj.setColumn(0);
@@ -76,7 +83,8 @@
}
@Test
- public void testGetNextTuple() throws ExecException, IOException {
+ public void testGetNextTuple1() throws ExecException, PlanException {
+ setUp1();
int size=0;
for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
Tuple t = (Tuple)res.result;
@@ -96,5 +104,50 @@
//check if all the tuples in the input are generated
assertEquals(true, size==db.size());
}
+
+ private void setUp2() throws PlanException, ExecException{
+ lr = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0,0,db.iterator().next());
+ List<ExprPlan> plans = lr.getPlans();
+ POLocalRearrange lrT = GenPhyOp.topLocalRearrangeOPWithPlanPlain(0, 1, db.iterator().next());
+ List<ExprPlan> plansT = lrT.getPlans();
+ plans.add(plansT.get(0));
+ lr.setPlans(plans);
+
+ POProject proj = GenPhyOp.exprProject();
+ proj.setColumn(0);
+ proj.setResultType(DataType.TUPLE);
+ proj.setOverloaded(true);
+ Tuple t = new DefaultTuple();
+ t.append(db);
+ proj.attachInput(t);
+ List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
+ inputs.add(proj);
+ lr.setInputs(inputs);
+ }
+
+ @Test
+ public void testGetNextTuple2() throws ExecException, PlanException {
+ setUp2();
+ int size=0;
+ for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
+ Tuple t = (Tuple)res.result;
+ IndexedTuple it = (IndexedTuple)t.get(1);
+ //Check if the index is same as input index
+ assertEquals((float)0, (float)it.index, 0.01f);
+
+ //Check if the input baf contains the value tuple
+ assertEquals(true, TestHelper.bagContains(db, it.toTuple()));
+
+ //Check if the input key and the output key are same
+ Tuple inpKey = TupleFactory.getInstance().newTuple(2);
+ inpKey.set(0,it.toTuple().get(0));
+ inpKey.set(1,it.toTuple().get(1));
+ assertEquals(true, inpKey.compareTo((Tuple)t.get(0))==0);
+ ++size;
+ }
+
+ //check if all the tuples in the input are generated
+ assertEquals(true, size==db.size());
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestMRCompiler.java Thu May 15 18:09:30 2008
@@ -17,8 +17,6 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -29,23 +27,39 @@
import java.util.List;
import java.util.Random;
+import org.apache.pig.ComparisonFunc;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.SUM;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.FindQuantiles;
+import org.apache.pig.impl.builtin.GFCross;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.mapReduceLayer.MRCompiler;
import org.apache.pig.impl.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.physicalLayer.plans.PlanPrinter;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POSplit;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POUnion;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.TestPOSort.WeirdComparator;
import org.apache.pig.test.utils.GenPhyOp;
import org.junit.After;
import org.junit.Before;
@@ -54,7 +68,7 @@
public class TestMRCompiler extends junit.framework.TestCase {
static PhysicalPlan<PhysicalOperator> php = new PhysicalPlan<PhysicalOperator>();
- MiniCluster cluster = MiniCluster.buildCluster();
+// MiniCluster cluster = MiniCluster.buildCluster();
static PigContext pc;
@@ -64,6 +78,7 @@
static final long SEED = 1013;
+ static Random r;
static{
pc = new PigContext();
try {
@@ -72,14 +87,16 @@
// TODO Auto-generated catch block
e.printStackTrace();
}
+ r = new Random(SEED);
}
@Before
public void setUp() throws ExecException {
- GenPhyOp.setR(new Random(SEED));
+ GenPhyOp.setR(r);
GenPhyOp.setPc(pc);
- int numTests = 14;
+ int numTests = 15;
+// int numTests = 9;
tests = new String[numTests];
int cnt = -1;
@@ -89,6 +106,7 @@
tests[++cnt] = "intTestRun2";
for (int i = 1; i <= 3; i++)
tests[++cnt] = "intTestSpl" + i;
+ tests[++cnt] = "intTestSortUDF1";
}
@After
@@ -112,7 +130,9 @@
PhysicalPlan<PhysicalOperator> part1 = new PhysicalPlan<PhysicalOperator>();
POLoad lC = GenPhyOp.topLoadOp();
POFilter fC = GenPhyOp.topFilterOp();
+ fC.setRequestedParallelism(20);
POLocalRearrange lrC = GenPhyOp.topLocalRearrangeOp();
+ lrC.setRequestedParallelism(10);
POGlobalRearrange grC = GenPhyOp.topGlobalRearrangeOp();
POPackage pkC = GenPhyOp.topPackageOp();
part1.add(lC);
@@ -126,7 +146,9 @@
part1.connect(grC, pkC);
POPackage pkD = GenPhyOp.topPackageOp();
+ pkD.setRequestedParallelism(20);
POLocalRearrange lrD = GenPhyOp.topLocalRearrangeOp();
+ lrD.setRequestedParallelism(30);
POGlobalRearrange grD = GenPhyOp.topGlobalRearrangeOp();
POLoad lD = GenPhyOp.topLoadOp();
part1.add(lD);
@@ -399,7 +421,9 @@
php.connect(lA, spl);
POFilter fl1 = GenPhyOp.topFilterOp();
+ fl1.setRequestedParallelism(10);
POFilter fl2 = GenPhyOp.topFilterOp();
+ fl2.setRequestedParallelism(20);
php.add(fl1);
php.add(fl2);
php.connect(spl, fl1);
@@ -413,8 +437,11 @@
php.connect(fl2, sp21);
POFilter fl11 = GenPhyOp.topFilterOp();
+ fl11.setRequestedParallelism(10);
POFilter fl21 = GenPhyOp.topFilterOp();
+ fl21.setRequestedParallelism(20);
POFilter fl22 = GenPhyOp.topFilterOp();
+ fl22.setRequestedParallelism(30);
php.add(fl11);
php.add(fl21);
php.add(fl22);
@@ -423,8 +450,11 @@
php.connect(sp21, fl22);
POLocalRearrange lr1 = GenPhyOp.topLocalRearrangeOp();
+ lr1.setRequestedParallelism(40);
POLocalRearrange lr21 = GenPhyOp.topLocalRearrangeOp();
+ lr21.setRequestedParallelism(15);
POLocalRearrange lr22 = GenPhyOp.topLocalRearrangeOp();
+ lr22.setRequestedParallelism(35);
php.add(lr1);
php.add(lr21);
php.add(lr22);
@@ -436,13 +466,16 @@
php.addAsLeaf(gr);
POPackage pk = GenPhyOp.topPackageOp();
+ pk.setRequestedParallelism(25);
php.addAsLeaf(pk);
POSplit sp2 = GenPhyOp.topSplitOp();
php.addAsLeaf(sp2);
POFilter fl3 = GenPhyOp.topFilterOp();
+ fl3.setRequestedParallelism(100);
POFilter fl4 = GenPhyOp.topFilterOp();
+ fl4.setRequestedParallelism(80);
php.add(fl3);
php.add(fl4);
php.connect(sp2, fl3);
@@ -687,6 +720,86 @@
POStore st = GenPhyOp.topStoreOp();
php.addAsLeaf(st);
}
+
+ public static void intTestSortUDF1() throws PlanException, ExecException{
+ php = new PhysicalPlan<PhysicalOperator>();
+ PhysicalPlan<PhysicalOperator> ldFil1 = GenPhyOp.loadedFilter();
+ php.merge(ldFil1);
+
+ String funcName = WeirdComparator.class.getName();
+ POUserFunc comparator = new POUserComparisonFunc(
+ new OperatorKey("", r.nextLong()), -1, null, funcName);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, ldFil1.getLeaves(),
+ null, null, comparator);
+ sort.setRequestedParallelism(20);
+ ExprPlan nesSortPlan = new ExprPlan();
+ POProject topPrj = new POProject(new OperatorKey("", r.nextLong()));
+ topPrj.setColumn(1);
+ topPrj.setOverloaded(true);
+ topPrj.setResultType(DataType.TUPLE);
+ nesSortPlan.add(topPrj);
+
+ POProject prjStar2 = new POProject(new OperatorKey("", r.nextLong()));
+ prjStar2.setResultType(DataType.TUPLE);
+ prjStar2.setStar(true);
+ nesSortPlan.add(prjStar2);
+
+ nesSortPlan.connect(topPrj, prjStar2);
+ List<ExprPlan> nesSortPlanLst = new ArrayList<ExprPlan>();
+ nesSortPlanLst.add(nesSortPlan);
+
+ sort.setSortPlans(nesSortPlanLst);
+
+ php.add(sort);
+ php.connect(ldFil1.getLeaves().get(0), sort);
+
+ List<String> udfs = new ArrayList<String>();
+ udfs.add(FindQuantiles.class.getName());
+ udfs.add(SUM.class.getName());
+ POForEach fe3 = GenPhyOp.topForEachOPWithUDF(udfs);
+ php.add(fe3);
+ php.connect(sort, fe3);
+
+ PhysicalPlan<PhysicalOperator> grpChain1 = GenPhyOp.grpChain();
+ php.merge(grpChain1);
+ php.connect(fe3,grpChain1.getRoots().get(0));
+
+ udfs.clear();
+ udfs.add(AVG.class.getName());
+ POForEach fe4 = GenPhyOp.topForEachOPWithUDF(udfs);
+ php.addAsLeaf(fe4);
+
+ PhysicalPlan<PhysicalOperator> grpChain2 = GenPhyOp.grpChain();
+ php.merge(grpChain2);
+ php.connect(fe4,grpChain2.getRoots().get(0));
+
+ udfs.clear();
+ udfs.add(GFCross.class.getName());
+ POForEach fe5 = GenPhyOp.topForEachOPWithUDF(udfs);
+ php.addAsLeaf(fe5);
+
+ POStore st = GenPhyOp.topStoreOp();
+ php.addAsLeaf(st);
+ }
+
+ public static class WeirdComparator extends ComparisonFunc {
+
+ @Override
+ public int compare(Tuple t1, Tuple t2) {
+ // TODO Auto-generated method stub
+ int result = 0;
+ try {
+ int i1 = (Integer) t1.get(1);
+ int i2 = (Integer) t2.get(1);
+ result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ }
public static void generate() throws SecurityException, NoSuchMethodException,
IllegalArgumentException, IllegalAccessException,
@@ -713,7 +826,7 @@
ppp.print(baos);
- FileOutputStream fos = new FileOutputStream("intTest/org/apache/pig/test/data/GoldenFiles/MRC"
+ FileOutputStream fos = new FileOutputStream("test/org/apache/pig/test/data/GoldenFiles/MRC"
+ (i + 1) + ".gld");
fos.write(baos.toByteArray());
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=656913&r1=656912&r2=656913&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Thu May 15 18:09:30 2008
@@ -34,191 +34,191 @@
import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserComparisonFunc;
-import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POUserFunc;
import org.apache.pig.test.utils.GenRandomData;
import org.junit.Test;
public class TestPOSort extends TestCase {
- Random r = new Random();
- int MAX_TUPLES = 10;
+ Random r = new Random();
+ int MAX_TUPLES = 10;
- @Test
- public void testPOSortAscString() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- pr1.setResultType(DataType.CHARARRAY);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(true);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(0);
- Object i2 = ((Tuple) res2.result).get(0);
- int i = DataType.compare(i1, i2);
- // System.out.println(res2.result + " i = " + i);
- assertEquals(true, (i <= 0));
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortDescString() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- pr1.setResultType(DataType.CHARARRAY);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(false);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(0);
- Object i2 = ((Tuple) res2.result).get(0);
- int i = DataType.compare(i1, i2);
- // System.out.println(res2.result + " i = " + i);
- assertEquals(true, (i >= 0));
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortAsc() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
- pr1.setResultType(DataType.INTEGER);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(true);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(1);
- Object i2 = ((Tuple) res2.result).get(1);
- int i = DataType.compare(i1, i2);
- assertEquals(true, (i <= 0));
- // System.out.println(res2.result);
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortDesc() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
- pr1.setResultType(DataType.INTEGER);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(false);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(1);
- Object i2 = ((Tuple) res2.result).get(1);
- int i = DataType.compare(i1, i2);
- assertEquals(true, (i >= 0));
- // System.out.println(res2.result);
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortUDF() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- String funcName = WeirdComparator.class.getName() + "()";
- /*POUserFunc comparator = new POUserFunc(
- new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
- POUserFunc comparator = new POUserComparisonFunc(
- new OperatorKey("", r.nextLong()), -1, null, funcName);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- null, null, comparator);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- int i1 = (Integer) ((Tuple) res1.result).get(1);
- int i2 = (Integer) ((Tuple) res2.result).get(1);
- int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
- assertEquals(true, (i <= 0));
- System.out.println(i + " : " + res2.result);
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- // sorts values in ascending order of their distance from 50
- public static class WeirdComparator extends ComparisonFunc {
-
- @Override
- public int compare(Tuple t1, Tuple t2) {
- // TODO Auto-generated method stub
- int result = 0;
- try {
- int i1 = (Integer) t1.get(1);
- int i2 = (Integer) t2.get(1);
- result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return result;
- }
+ @Test
+ public void testPOSortAscString() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.CHARARRAY);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(true);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(0);
+ Object i2 = ((Tuple) res2.result).get(0);
+ int i = DataType.compare(i1, i2);
+ // System.out.println(res2.result + " i = " + i);
+ assertEquals(true, (i <= 0));
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortDescString() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.CHARARRAY);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(0);
+ Object i2 = ((Tuple) res2.result).get(0);
+ int i = DataType.compare(i1, i2);
+ // System.out.println(res2.result + " i = " + i);
+ assertEquals(true, (i >= 0));
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortAsc() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(true);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(1);
+ Object i2 = ((Tuple) res2.result).get(1);
+ int i = DataType.compare(i1, i2);
+ assertEquals(true, (i <= 0));
+ // System.out.println(res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortDesc() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(1);
+ Object i2 = ((Tuple) res2.result).get(1);
+ int i = DataType.compare(i1, i2);
+ assertEquals(true, (i >= 0));
+ // System.out.println(res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortUDF() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ String funcName = WeirdComparator.class.getName() + "()";
+ /*POUserFunc comparator = new POUserFunc(
+ new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
+ POUserFunc comparator = new POUserComparisonFunc(
+ new OperatorKey("", r.nextLong()), -1, null, funcName);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ null, null, comparator);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ int i1 = (Integer) ((Tuple) res1.result).get(1);
+ int i2 = (Integer) ((Tuple) res2.result).get(1);
+ int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+ assertEquals(true, (i <= 0));
+ System.out.println(i + " : " + res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ // sorts values in ascending order of their distance from 50
+ public static class WeirdComparator extends ComparisonFunc {
+
+ @Override
+ public int compare(Tuple t1, Tuple t2) {
+ // TODO Auto-generated method stub
+ int result = 0;
+ try {
+ int i1 = (Integer) t1.get(1);
+ int i2 = (Integer) t2.get(1);
+ result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return result;
+ }
- }
+ }
}