You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/04/17 23:39:30 UTC
svn commit: r649290 - in /incubator/pig/branches/types: ./
src/org/apache/pig/impl/physicalLayer/plans/
src/org/apache/pig/impl/physicalLayer/topLevelOperators/
test/org/apache/pig/test/ test/org/apache/pig/test/utils/
Author: gates
Date: Thu Apr 17 14:39:26 2008
New Revision: 649290
URL: http://svn.apache.org/viewvc?rev=649290&view=rev
Log:
incr4 patch from Shravan. Adds POForeach, POLocalRearrange, POPackage and makes suggested changes to POStore and POLoad.
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu Apr 17 14:39:26 2008
@@ -141,7 +141,8 @@
**/test/TestGTOrEqual.java,**/test/TestLessThan.java,**/test/TestLTOrEqual.java,
**/test/TestEqualTo.java,**/test/TestNotEqualTo.java, **/test/TestPOGenerate.java,
**/test/TestProject.java, **/test/TestLoad.java, **/test/TestStore.java,
- **/test/FakeFSOutputStream.java,
+ **/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
+ **/test/TestLocalRearrange.java,
**/test/FakeFSInputStream.java, **/test/Util.java,
**/logicalLayer/*.java,
**/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
@@ -249,6 +250,9 @@
<include name="**/TestPOGenerate.java" />
<include name="**/TestLoad.java" />
<include name="**/TestStore.java" />
+ <include name="**/TestPackage.java" />
+ <include name="**/TestLocalRearrange.java" />
+ <include name="**/TestForEach.java" />
<!--
<include name="**/*Test*.java" />
<exclude name="**/TestLargeFile.java" />
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Thu Apr 17 14:39:26 2008
@@ -22,12 +22,14 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.PlanVisitor;
/**
@@ -44,7 +46,7 @@
* @param <O>
* @param <P>
*/
-public abstract class PhyPlanVisitor<O extends PhysicalOperator, P extends PhysicalPlan<O>> extends PlanVisitor<O,P> {
+public class PhyPlanVisitor<O extends PhysicalOperator, P extends PhysicalPlan<O>> extends PlanVisitor<O,P> {
public PhyPlanVisitor(P plan) {
super(plan);
@@ -67,11 +69,17 @@
ExprPlanVisitor epv = new ExprPlanVisitor(fl.getPlan());
epv.visit();
}
-//
-// public void visitLocalRearrange(POLocalRearrange lr){
-// //do nothing
-// }
-//
+
+ public void visitLocalRearrange(POLocalRearrange lr) throws ParseException{
+ PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> ppv = new PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(lr.getPlan());
+ ppv.visit();
+ }
+
+ public void visitForEach(POForEach fe) throws ParseException{
+ PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>> ppv = new PhyPlanVisitor<PhysicalOperator, PhysicalPlan<PhysicalOperator>>(fe.getPlan());
+ ppv.visit();
+ }
+
// public void visitGlobalRearrange(POGlobalRearrange gr){
// //do nothing
// }
@@ -80,9 +88,9 @@
// //do nothing
// }
//
-// public void visitPackage(POPackage pkg){
-// //do nothing
-// }
+ public void visitPackage(POPackage pkg){
+ //do nothing
+ }
public void visitGenerate(POGenerate pogen) {
//do nothing
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhysicalPlan.java Thu Apr 17 14:39:26 2008
@@ -17,6 +17,7 @@
*/
package org.apache.pig.impl.physicalLayer.plans;
+import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
@@ -31,7 +32,7 @@
*
* @param <E>
*/
-public abstract class PhysicalPlan<E extends PhysicalOperator> extends OperatorPlan<E> {
+public class PhysicalPlan<E extends PhysicalOperator> extends OperatorPlan<E> {
public PhysicalPlan() {
super();
@@ -51,5 +52,12 @@
public void explain(OutputStream out){
//Use a plan visitor and output the current physical
//plan into out
+ }
+
+ @Override
+ public void connect(E from, E to)
+ throws IOException {
+ super.connect(from, to);
+ to.setInputs(getPredecessors(to));
}
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POForEach.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+
+/**
+ * The foreach operator
+ * It has an embedded physical plan that
+ * generates tuples as per the specification.
+ */
+public class POForEach extends PhysicalOperator<PhyPlanVisitor> {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ PhysicalPlan<PhysicalOperator> plan;
+
+ POGenerate gen;
+
+ //Since the plan has a generate, this needs to be maintained
+ //as the generate can potentially return multiple tuples for
+ //same call.
+ private boolean processingPlan = false;
+
+ public POForEach(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POForEach(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POForEach(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ public POForEach(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws ParseException {
+ v.visitForEach(this);
+ }
+
+ @Override
+ public String name() {
+ return "For Each - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ /**
+ * Overridden since the attachment of the new input should cause the old
+ * processing to end.
+ */
+ @Override
+ public void attachInput(Tuple t) {
+ super.attachInput(t);
+ processingPlan = false;
+ }
+
+ /**
+ * Calls getNext on the generate operator inside the nested
+ * physical plan and returns it maintaining an additional state
+ * to denote the begin and end of the nested plan processing.
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = null;
+ Result inp = null;
+ //The nested plan is under processing
+ //So return tuples that the generate oper
+ //returns
+ if(processingPlan){
+ while(true) {
+ res = gen.getNext(t);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ return res;
+ }
+ if(res.returnStatus==POStatus.STATUS_ERR)
+ return res;
+ if(res.returnStatus==POStatus.STATUS_NULL)
+ continue;
+ if(res.returnStatus==POStatus.STATUS_EOP){
+ processingPlan = false;
+ break;
+ }
+ }
+ }
+ //The nested plan processing is done or is
+ //yet to begin. So process the input and start
+ //nested plan processing on the input tuple
+ //read
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if (inp.returnStatus == POStatus.STATUS_NULL)
+ continue;
+
+ plan.attachInput((Tuple) inp.result);
+
+ res = gen.getNext(t);
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if(inp.returnStatus == POStatus.STATUS_NULL)
+ continue;
+
+ processingPlan = true;
+
+ return res;
+ }
+ return inp;
+ }
+
+ public PhysicalPlan<PhysicalOperator> getPlan() {
+ return plan;
+ }
+
+ public void setPlan(PhysicalPlan<PhysicalOperator> plan) {
+ this.plan = plan;
+ gen = (POGenerate) plan.getLeaves().get(0);
+ }
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLoad.java Thu Apr 17 14:39:26 2008
@@ -94,6 +94,7 @@
*/
private void tearDown() throws IOException{
is.close();
+ setUpDone = false;
}
/**
@@ -128,7 +129,6 @@
res.returnStatus = POStatus.STATUS_OK;
} catch (IOException e) {
log.error("Received error from loader function: " + e);
- res.returnStatus = POStatus.STATUS_ERR;
return res;
}
return res;
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POLocalRearrange.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
+
+/**
+ * The local rearrange operator is a part of the co-group
+ * implementation. It has an embedded physical plan that
+ * generates tuples of the form (grpKey,(indxed inp Tuple)).
+ *
+ */
+public class POLocalRearrange extends PhysicalOperator<PhyPlanVisitor> {
+
+ private Log log = LogFactory.getLog(getClass());
+
+ PhysicalPlan<PhysicalOperator> plan;
+
+ // The position of this LR in the package operator
+ int index;
+
+ POGenerate gen;
+
+ //Since the plan has a generate, this needs to be maintained
+ //as the generate can potentially return multiple tuples for
+ //same call.
+ private boolean processingPlan = false;
+
+ public POLocalRearrange(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POLocalRearrange(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POLocalRearrange(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ public POLocalRearrange(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ index = -1;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws ParseException {
+ v.visitLocalRearrange(this);
+ }
+
+ @Override
+ public String name() {
+ return "Local Rearrange - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ public int getIndex() {
+ return index;
+ }
+
+ public void setIndex(int index) {
+ this.index = index;
+ }
+
+ /**
+ * Overridden since the attachment of the new input should cause the old
+ * processing to end.
+ */
+ @Override
+ public void attachInput(Tuple t) {
+ super.attachInput(t);
+ processingPlan = false;
+ }
+
+ /**
+ * Calls getNext on the generate operator inside the nested
+ * physical plan. Converts the generated tuple into the proper
+ * format, i.e, (key,{(value)})
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = null;
+ Result inp = null;
+ //The nested plan is under processing
+ //So return tuples that the generate oper
+ //returns after converting them to the required
+ //format
+ if(processingPlan){
+ while(true) {
+ res = gen.getNext(t);
+ if(res.returnStatus==POStatus.STATUS_OK){
+ res.result = constructLROutput((Tuple)res.result);
+ return res;
+ }
+ if(res.returnStatus==POStatus.STATUS_ERR)
+ return res;
+ if(res.returnStatus==POStatus.STATUS_NULL)
+ continue;
+ if(res.returnStatus==POStatus.STATUS_EOP){
+ processingPlan = false;
+ break;
+ }
+ }
+ }
+ //The nested plan processing is done or is
+ //yet to begin. So process the input and start
+ //nested plan processing on the input tuple
+ //read
+ while (true) {
+ inp = processInput();
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if (inp.returnStatus == POStatus.STATUS_NULL)
+ continue;
+
+ plan.attachInput((Tuple) inp.result);
+
+ res = gen.getNext(t);
+ if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
+ break;
+ if(inp.returnStatus == POStatus.STATUS_NULL)
+ continue;
+
+ processingPlan = true;
+
+ res.result = constructLROutput((Tuple)res.result);
+ return res;
+ }
+ return inp;
+ }
+
+ private Tuple constructLROutput(Tuple genOut){
+ //Strip the input tuple off its key which
+ //will be the first field in the tuple
+ Object key = genOut.getAll().remove(0);
+
+ //Create the indexed tuple out of the value
+ //that is remaining in the input tuple
+ IndexedTuple it = new IndexedTuple(genOut, index);
+
+ //Put the key and the indexed tuple
+ //in a tuple and return
+ Tuple outPut = new DefaultTuple();
+ outPut.append(key);
+ outPut.append(it);
+ return outPut;
+ }
+
+ public PhysicalPlan<PhysicalOperator> getPlan() {
+ return plan;
+ }
+
+ public void setPlan(PhysicalPlan<PhysicalOperator> plan) {
+ this.plan = plan;
+ gen = (POGenerate) plan.getLeaves().get(0);
+ }
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POPackage.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+/**
+ * The package operator that packages
+ * the globally rearranged tuples into
+ * output format as required by co-group.
+ * This is last stage of processing co-group.
+ * This operator has a slightly different
+ * format than other operators in that, it
+ * takes two things as input. The key being
+ * worked on and the iterator of bags that
+ * contain indexed tuples that just need to
+ * be packaged into their appropriate output
+ * bags based on the index.
+ */
+public class POPackage extends PhysicalOperator<PhyPlanVisitor> {
+ //The iterator of indexed Tuples
+ //that is typically provided by
+ //Hadoop
+ Iterator<IndexedTuple> indTupIter;
+
+ //The key being worked on
+ Object key;
+
+ //The number of inputs to this
+ //co-group
+ int numInputs;
+
+ //Denotes if inner is specified
+ //on a particular input
+ boolean[] inner;
+
+ private final Log log = LogFactory.getLog(getClass());
+
+ public POPackage(OperatorKey k) {
+ this(k, -1, null);
+ }
+
+ public POPackage(OperatorKey k, int rp) {
+ this(k, rp, null);
+ }
+
+ public POPackage(OperatorKey k, List<PhysicalOperator> inp) {
+ this(k, -1, inp);
+ }
+
+ public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ numInputs = -1;
+ }
+
+ @Override
+ public String name() {
+ return "Package - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws ParseException {
+ v.visitPackage(this);
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ return false;
+ }
+
+ /**
+ * Attaches the required inputs
+ * @param k - the key being worked on
+ * @param inp - iterator of indexed tuples typically
+ * obtained from Hadoop
+ */
+ public void attachInput(Object k, Iterator<IndexedTuple> inp) {
+ indTupIter = inp;
+ key = k;
+ }
+
+ /**
+ * attachInput's better half!
+ */
+ public void detachInput() {
+ indTupIter = null;
+ key = null;
+ }
+
+ public int getNumInps() {
+ return numInputs;
+ }
+
+ public void setNumInps(int numInps) {
+ this.numInputs = numInps;
+ }
+
+ public boolean[] getInner() {
+ return inner;
+ }
+
+ public void setInner(boolean[] inner) {
+ this.inner = inner;
+ }
+
+ /**
+ * From the inputs, constructs the output tuple
+ * for this co-group in the required format which
+ * is (key, {bag of tuples from input 1}, {bag of tuples from input 2}, ...)
+ */
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ if(indTupIter==null){
+ throw new ExecException("Incorrect usage of the Package operator. " +
+ "No input has been attached.");
+ }
+
+ //Create numInputs bags
+ DataBag[] dbs = new DataBag[numInputs];
+ for (int i = 0; i < numInputs; i++) {
+ dbs[i] = DefaultBagFactory.getInstance().newDefaultBag();
+ }
+
+ //For each indexed tup in the inp, sort them
+ //into their corresponding bags based
+ //on the index
+ while (indTupIter.hasNext()) {
+ IndexedTuple it = indTupIter.next();
+ dbs[it.index].add(it.toTuple());
+ }
+
+ //Construct the output tuple by appending
+ //the key and all the above constructed bags
+ //and return it.
+ Tuple res;
+ try{
+ res = TupleFactory.getInstance().newTuple(numInputs+1);
+ res.set(0,key);
+ int i=-1;
+ for (DataBag bag : dbs) {
+ if(inner[++i]){
+ if(bag.size()==0){
+ detachInput();
+ Result r = new Result();
+ r.returnStatus = POStatus.STATUS_NULL;
+ return r;
+ }
+ }
+ res.set(i+1,bag);
+ }
+ }catch(IOException e){
+ log.error("Received error while constructing the output tuple");
+ return new Result();
+ }
+ detachInput();
+ Result r = new Result();
+ r.result = res;
+ r.returnStatus = POStatus.STATUS_OK;
+ return r;
+ }
+
+
+
+
+}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POStore.java Thu Apr 17 14:39:26 2008
@@ -21,6 +21,8 @@
import java.io.OutputStream;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.StoreFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
@@ -51,6 +53,8 @@
// PigContext passed to us by the operator creator
PigContext pc;
+ private final Log log = LogFactory.getLog(getClass());
+
public POStore(OperatorKey k) {
this(k, -1, null);
}
@@ -116,6 +120,12 @@
public Result store() throws ExecException{
try{
setUp();
+ }catch (IOException e) {
+ ExecException ee = new ExecException("Unable to setup the storer because of the exception: " + e.getMessage());
+ ee.initCause(e);
+ throw ee;
+ }
+ try{
Result res;
Tuple inpValue = null;
while(true){
@@ -136,7 +146,7 @@
tearDown();
return res;
}catch(IOException e){
- e.printStackTrace();
+ log.error("Received error from storer function: " + e);
return new Result();
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PhysicalOperator.java Thu Apr 17 14:39:26 2008
@@ -200,7 +200,7 @@
Result res = new Result();
Tuple inpValue = null;
if (input == null && inputs == null) {
- log.warn("No inputs found. Signaling End of Processing.");
+// log.warn("No inputs found. Signaling End of Processing.");
res.returnStatus = POStatus.STATUS_EOP;
return res;
}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests foreach db generate $0
+ * Constructs a bag that projects the input
+ * bag onto the the column $0 and checks if
+ * the tuples generated by foreach are the same
+ * as those in the projected bag
+ *
+ */
+public class TestForEach {
+
+ POForEach fe;
+ Tuple t;
+ DataBag db;
+ DataBag projDB;
+
+
+ @Before
+ public void setUp() throws Exception {
+ Random r = new Random();
+ db = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+ projDB = TestHelper.projectBag(db,0);
+ fe = GenPhyOp.topForEachOPWithPlan(0,db.iterator().next());
+ POProject proj = GenPhyOp.exprProject();
+ proj.setColumn(0);
+ proj.setResultType(DataType.TUPLE);
+ proj.setOverloaded(true);
+ Tuple t = new DefaultTuple();
+ t.append(db);
+ proj.attachInput(t);
+ List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
+ inputs.add(proj);
+ fe.setInputs(inputs);
+ }
+
+
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetNextTuple() throws ExecException, IOException {
+ int size=0;
+ for(Result res=fe.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=fe.getNext(t)){
+ Tuple t = (Tuple)res.result;
+ assertEquals(true, TestHelper.bagContains(projDB, t));
+ ++size;
+ }
+ assertEquals(true, size==db.size());
+ }
+
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java Thu Apr 17 14:39:26 2008
@@ -162,10 +162,6 @@
return true;
}
- public static void main(String[] args) throws ExecException {
-
- }
-
@Test
public void testOperator() throws ExecException {
int TRIALS = 10;
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultTuple;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests localrearrange db for
+ * group db by $0
+ *
+ */
+public class TestLocalRearrange {
+
+ POLocalRearrange lr;
+ Tuple t;
+ DataBag db;
+
+
+ @Before
+ public void setUp() throws Exception {
+ Random r = new Random();
+ db = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+ lr = GenPhyOp.topLocalRearrangeOPWithPlan(0,0,db.iterator().next());
+ POProject proj = GenPhyOp.exprProject();
+ proj.setColumn(0);
+ proj.setResultType(DataType.TUPLE);
+ proj.setOverloaded(true);
+ Tuple t = new DefaultTuple();
+ t.append(db);
+ proj.attachInput(t);
+ List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
+ inputs.add(proj);
+ lr.setInputs(inputs);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testGetNextTuple() throws ExecException, IOException {
+ int size=0;
+ for(Result res=lr.getNext(t);res.returnStatus!=POStatus.STATUS_EOP;res=lr.getNext(t)){
+ Tuple t = (Tuple)res.result;
+ IndexedTuple it = (IndexedTuple)t.get(1);
+ //Check if the index is same as input index
+ assertEquals((float)0, (float)it.index, 0.01f);
+
+ //Check if the input baf contains the value tuple
+ assertEquals(true, TestHelper.bagContains(db, it.toTuple()));
+
+ //Check if the input key and the output key are same
+ String inpKey = (String)it.toTuple().get(0);
+ assertEquals(true, inpKey.compareTo((String)t.get(0))==0);
+ ++size;
+ }
+
+ //check if all the tuples in the input are generated
+ assertEquals(true, size==db.size());
+ }
+
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOGenerate.java Thu Apr 17 14:39:26 2008
@@ -61,208 +61,208 @@
public class TestPOGenerate extends TestCase {
- DataBag cogroup;
- DataBag partialFlatten;
- DataBag simpleGenerate;
- Random r = new Random();
- BagFactory bf = BagFactory.getInstance();
- TupleFactory tf = TupleFactory.getInstance();
-
- @Before
- public void setUp() throws Exception {
- Tuple [] inputA = new Tuple[4];
- Tuple [] inputB = new Tuple[4];
- for(int i = 0; i < 4; i++) {
- inputA[i] = tf.newTuple(2);
- inputB[i] = tf.newTuple(1);
- }
- inputA[0].set(0, 'a');
- inputA[0].set(1, '1');
- inputA[1].set(0, 'b');
- inputA[1].set(1, '1');
- inputA[2].set(0, 'a');
- inputA[2].set(1, '1');
- inputA[3].set(0, 'c');
- inputA[3].set(1, '1');
- inputB[0].set(0, 'b');
- inputB[1].set(0, 'b');
- inputB[2].set(0, 'a');
- inputB[3].set(0, 'd');
- DataBag cg11 = bf.newDefaultBag();
- cg11.add(inputA[0]);
- cg11.add(inputA[2]);
- DataBag cg21 = bf.newDefaultBag();
- cg21.add(inputA[1]);
- DataBag cg31 = bf.newDefaultBag();
- cg31.add(inputA[3]);
- DataBag emptyBag = bf.newDefaultBag();
- DataBag cg12 = bf.newDefaultBag();
- cg12.add(inputB[2]);
- DataBag cg22 = bf.newDefaultBag();
- cg22.add(inputB[0]);
- cg22.add(inputB[1]);
- DataBag cg42 = bf.newDefaultBag();
- cg42.add(inputB[3]);
- Tuple [] tIn = new Tuple[4];
- for(int i = 0; i < 4; ++i) {
- tIn[i] = tf.newTuple(2);
- }
- tIn[0].set(0, cg11);
- tIn[0].set(1, cg12);
- tIn[1].set(0, cg21);
- tIn[1].set(1, cg22);
- tIn[2].set(0, cg31);
- tIn[2].set(1, emptyBag);
- tIn[3].set(0, emptyBag);
- tIn[3].set(1, cg42);
-
- cogroup = bf.newDefaultBag();
- for(int i = 0; i < 4; ++i) {
- cogroup.add(tIn[i]);
- }
-
- Tuple[] tPartial = new Tuple[4];
- for(int i = 0; i < 4; ++i) {
- tPartial[i] = tf.newTuple(2);
- tPartial[i].set(0, inputA[i].get(0));
- tPartial[i].set(1, inputA[i].get(1));
- }
-
- tPartial[0].append(cg12);
-
- tPartial[1].append(cg22);
-
- tPartial[2].append(cg12);
-
- tPartial[3].append(emptyBag);
-
- partialFlatten = bf.newDefaultBag();
- for(int i = 0; i < 4; ++i) {
- partialFlatten.add(tPartial[i]);
- }
-
- simpleGenerate = bf.newDefaultBag();
- for(int i = 0; i < 4; ++i) {
- simpleGenerate.add(inputA[i]);
- }
- /*
- System.out.println("Cogroup : " + cogroup);
- System.out.println("Partial : " + partialFlatten);
- System.out.println("Simple : " + simpleGenerate);
- */
- }
-
- public void testJoin() throws Exception {
- ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
- prj1.setResultType(DataType.BAG);
- prj2.setResultType(DataType.BAG);
- List<Boolean> toBeFlattened = new LinkedList<Boolean>();
- toBeFlattened.add(true);
- toBeFlattened.add(true);
- ExprPlan plan1 = new ExprPlan();
- plan1.add(prj1);
- ExprPlan plan2 = new ExprPlan();
- plan2.add(prj2);
- List<ExprPlan> inputs = new LinkedList<ExprPlan>();
- inputs.add(plan1);
- inputs.add(plan2);
- PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
- //DataBag obtained = bf.newDefaultBag();
- for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan1.attachInput(t);
- plan2.attachInput(t);
- Result output = poGen.getNext(t);
- while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
- //System.out.println(output.result);
- Tuple tObtained = (Tuple) output.result;
- assertTrue(tObtained.get(0).toString().equals(tObtained.get(2).toString()));
- //obtained.add((Tuple) output.result);
- output = poGen.getNext(t);
- }
- }
-
- }
-
- public void testPartialJoin() throws Exception {
- ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
- prj1.setResultType(DataType.BAG);
- prj2.setResultType(DataType.BAG);
- List<Boolean> toBeFlattened = new LinkedList<Boolean>();
- toBeFlattened.add(true);
- toBeFlattened.add(false);
- ExprPlan plan1 = new ExprPlan();
- plan1.add(prj1);
- ExprPlan plan2 = new ExprPlan();
- plan2.add(prj2);
- List<ExprPlan> inputs = new LinkedList<ExprPlan>();
- inputs.add(plan1);
- inputs.add(plan2);
- PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
-
- //DataBag obtained = bf.newDefaultBag();
- List<String> obtained = new LinkedList<String>();
- for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan1.attachInput(t);
- plan2.attachInput(t);
- Result output = poGen.getNext(t);
- while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
- //System.out.println(output.result);
- obtained.add(((Tuple) output.result).toString());
- output = poGen.getNext(t);
- }
- }
- int count = 0;
- for(Iterator<Tuple> it = partialFlatten.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- assertTrue(obtained.contains(t.toString()));
- ++count;
- }
- assertEquals(partialFlatten.size(), count);
-
- }
-
- public void testSimpleGenerate() throws Exception {
- ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
- prj1.setResultType(DataType.BAG);
- prj2.setResultType(DataType.BAG);
- List<Boolean> toBeFlattened = new LinkedList<Boolean>();
- toBeFlattened.add(true);
- toBeFlattened.add(false);
- ExprPlan plan1 = new ExprPlan();
- plan1.add(prj1);
- ExprPlan plan2 = new ExprPlan();
- plan2.add(prj2);
- List<ExprPlan> inputs = new LinkedList<ExprPlan>();
- inputs.add(plan1);
- inputs.add(plan2);
- PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
-
- //DataBag obtained = bf.newDefaultBag();
- List<String> obtained = new LinkedList<String>();
- for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan1.attachInput(t);
- plan2.attachInput(t);
- Result output = poGen.getNext(t);
- while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
- //System.out.println(output.result);
- obtained.add(((Tuple) output.result).toString());
- output = poGen.getNext(t);
- }
- }
-
- int count = 0;
- for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- assertTrue(obtained.contains(t.toString()));
- ++count;
- }
- assertEquals(simpleGenerate.size(), count);
-
- }
+ DataBag cogroup;
+ DataBag partialFlatten;
+ DataBag simpleGenerate;
+ Random r = new Random();
+ BagFactory bf = BagFactory.getInstance();
+ TupleFactory tf = TupleFactory.getInstance();
+
+ @Before
+ public void setUp() throws Exception {
+ Tuple [] inputA = new Tuple[4];
+ Tuple [] inputB = new Tuple[4];
+ for(int i = 0; i < 4; i++) {
+ inputA[i] = tf.newTuple(2);
+ inputB[i] = tf.newTuple(1);
+ }
+ inputA[0].set(0, 'a');
+ inputA[0].set(1, '1');
+ inputA[1].set(0, 'b');
+ inputA[1].set(1, '1');
+ inputA[2].set(0, 'a');
+ inputA[2].set(1, '1');
+ inputA[3].set(0, 'c');
+ inputA[3].set(1, '1');
+ inputB[0].set(0, 'b');
+ inputB[1].set(0, 'b');
+ inputB[2].set(0, 'a');
+ inputB[3].set(0, 'd');
+ DataBag cg11 = bf.newDefaultBag();
+ cg11.add(inputA[0]);
+ cg11.add(inputA[2]);
+ DataBag cg21 = bf.newDefaultBag();
+ cg21.add(inputA[1]);
+ DataBag cg31 = bf.newDefaultBag();
+ cg31.add(inputA[3]);
+ DataBag emptyBag = bf.newDefaultBag();
+ DataBag cg12 = bf.newDefaultBag();
+ cg12.add(inputB[2]);
+ DataBag cg22 = bf.newDefaultBag();
+ cg22.add(inputB[0]);
+ cg22.add(inputB[1]);
+ DataBag cg42 = bf.newDefaultBag();
+ cg42.add(inputB[3]);
+ Tuple [] tIn = new Tuple[4];
+ for(int i = 0; i < 4; ++i) {
+ tIn[i] = tf.newTuple(2);
+ }
+ tIn[0].set(0, cg11);
+ tIn[0].set(1, cg12);
+ tIn[1].set(0, cg21);
+ tIn[1].set(1, cg22);
+ tIn[2].set(0, cg31);
+ tIn[2].set(1, emptyBag);
+ tIn[3].set(0, emptyBag);
+ tIn[3].set(1, cg42);
+
+ cogroup = bf.newDefaultBag();
+ for(int i = 0; i < 4; ++i) {
+ cogroup.add(tIn[i]);
+ }
+
+ Tuple[] tPartial = new Tuple[4];
+ for(int i = 0; i < 4; ++i) {
+ tPartial[i] = tf.newTuple(2);
+ tPartial[i].set(0, inputA[i].get(0));
+ tPartial[i].set(1, inputA[i].get(1));
+ }
+
+ tPartial[0].append(cg12);
+
+ tPartial[1].append(cg22);
+
+ tPartial[2].append(cg12);
+
+ tPartial[3].append(emptyBag);
+
+ partialFlatten = bf.newDefaultBag();
+ for(int i = 0; i < 4; ++i) {
+ partialFlatten.add(tPartial[i]);
+ }
+
+ simpleGenerate = bf.newDefaultBag();
+ for(int i = 0; i < 4; ++i) {
+ simpleGenerate.add(inputA[i]);
+ }
+ /*
+ System.out.println("Cogroup : " + cogroup);
+ System.out.println("Partial : " + partialFlatten);
+ System.out.println("Simple : " + simpleGenerate);
+ */
+ }
+
+ public void testJoin() throws Exception {
+ ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ prj1.setResultType(DataType.BAG);
+ prj2.setResultType(DataType.BAG);
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(true);
+ toBeFlattened.add(true);
+ ExprPlan plan1 = new ExprPlan();
+ plan1.add(prj1);
+ ExprPlan plan2 = new ExprPlan();
+ plan2.add(prj2);
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ inputs.add(plan1);
+ inputs.add(plan2);
+ PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+ //DataBag obtained = bf.newDefaultBag();
+ for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan1.attachInput(t);
+ plan2.attachInput(t);
+ Result output = poGen.getNext(t);
+ while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+ //System.out.println(output.result);
+ Tuple tObtained = (Tuple) output.result;
+ assertTrue(tObtained.get(0).toString().equals(tObtained.get(2).toString()));
+ //obtained.add((Tuple) output.result);
+ output = poGen.getNext(t);
+ }
+ }
+
+ }
+
+ public void testPartialJoin() throws Exception {
+ ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ prj1.setResultType(DataType.BAG);
+ prj2.setResultType(DataType.BAG);
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(true);
+ toBeFlattened.add(false);
+ ExprPlan plan1 = new ExprPlan();
+ plan1.add(prj1);
+ ExprPlan plan2 = new ExprPlan();
+ plan2.add(prj2);
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ inputs.add(plan1);
+ inputs.add(plan2);
+ PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+
+ //DataBag obtained = bf.newDefaultBag();
+ List<String> obtained = new LinkedList<String>();
+ for(Iterator<Tuple> it = cogroup.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan1.attachInput(t);
+ plan2.attachInput(t);
+ Result output = poGen.getNext(t);
+ while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+ //System.out.println(output.result);
+ obtained.add(((Tuple) output.result).toString());
+ output = poGen.getNext(t);
+ }
+ }
+ int count = 0;
+ for(Iterator<Tuple> it = partialFlatten.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ assertTrue(obtained.contains(t.toString()));
+ ++count;
+ }
+ assertEquals(partialFlatten.size(), count);
+
+ }
+
+ public void testSimpleGenerate() throws Exception {
+ ExpressionOperator prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ ExpressionOperator prj2 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ prj1.setResultType(DataType.BAG);
+ prj2.setResultType(DataType.BAG);
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(true);
+ toBeFlattened.add(false);
+ ExprPlan plan1 = new ExprPlan();
+ plan1.add(prj1);
+ ExprPlan plan2 = new ExprPlan();
+ plan2.add(prj2);
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ inputs.add(plan1);
+ inputs.add(plan2);
+ PhysicalOperator poGen = new POGenerate(new OperatorKey("", r.nextLong()), inputs, toBeFlattened);
+
+ //DataBag obtained = bf.newDefaultBag();
+ List<String> obtained = new LinkedList<String>();
+ for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan1.attachInput(t);
+ plan2.attachInput(t);
+ Result output = poGen.getNext(t);
+ while(output.result != null && output.returnStatus != POStatus.STATUS_EOP) {
+ //System.out.println(output.result);
+ obtained.add(((Tuple) output.result).toString());
+ output = poGen.getNext(t);
+ }
+ }
+
+ int count = 0;
+ for(Iterator<Tuple> it = simpleGenerate.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ assertTrue(obtained.contains(t.toString()));
+ ++count;
+ }
+ assertEquals(simpleGenerate.size(), count);
+
+ }
}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java?rev=649290&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Thu Apr 17 14:39:26 2008
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.IndexedTuple;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.test.utils.GenRandomData;
+import org.apache.pig.test.utils.TestHelper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPackage {
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ static class ITIterator implements Iterator<IndexedTuple>,
+ Iterable<IndexedTuple> {
+ private Iterator<Tuple> it;
+
+ public ITIterator(Iterator<Tuple> it) {
+ this.it = it;
+ }
+
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ public IndexedTuple next() {
+ return (IndexedTuple) it.next();
+ }
+
+ public void remove() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public Iterator<IndexedTuple> iterator() {
+ return this;
+ }
+
+ }
+
+ public static boolean test(Object key, boolean inner[]) throws ExecException, IOException {
+ boolean ret = false;
+ Random r = new Random();
+ DataBag db1 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+ DataBag db2 = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
+ DataBag db = DefaultBagFactory.getInstance().newDefaultBag();
+ Iterator db1Iter = db1.iterator();
+ if(!inner[0]){
+ while (db1Iter.hasNext()) {
+ IndexedTuple it = new IndexedTuple((Tuple) db1Iter.next(), 0);
+ db.add(it);
+ }
+ }
+ Iterator db2Iter = db2.iterator();
+ while (db2Iter.hasNext()) {
+ IndexedTuple it = new IndexedTuple((Tuple) db2Iter.next(), 1);
+ db.add(it);
+ }
+ ITIterator iti = new TestPackage.ITIterator(db.iterator());
+ POPackage pop = new POPackage(new OperatorKey("", r.nextLong()));
+ pop.setNumInps(2);
+ pop.setInner(inner);
+ pop.attachInput(key, iti);
+ Tuple t = null;
+ Result res = null;
+ res = (Result) pop.getNext(t);
+ if(res.returnStatus==POStatus.STATUS_NULL && inner[0])
+ return true;
+ if (res.returnStatus != POStatus.STATUS_OK)
+ return false;
+
+ t = (Tuple) res.result;
+ Object outKey = t.get(0);
+ DataBag outDb1 = (DataBag) t.get(1);
+ DataBag outDb2 = (DataBag) t.get(2);
+
+ if (outKey == key && TestHelper.compareBags(db1, outDb1)
+ && TestHelper.compareBags(db2, outDb2))
+ return true;
+ return ret;
+ }
+
+ /**
+ * To show that it does not have any type specific
+ * code
+ */
+ private static boolean test(byte t, boolean[] inner) throws ExecException, IOException {
+ Random r = new Random();
+ switch (t) {
+ case DataType.BAG:
+ return test(GenRandomData.genRandSmallTupDataBag(r, 10, 100),inner);
+ case DataType.BOOLEAN:
+ return test(r.nextBoolean(),inner);
+ case DataType.BYTEARRAY:
+ return test(GenRandomData.genRandDBA(r),inner);
+ case DataType.CHARARRAY:
+ return test(GenRandomData.genRandString(r),inner);
+ case DataType.DOUBLE:
+ return test(r.nextDouble(),inner);
+ case DataType.FLOAT:
+ return test(r.nextFloat(),inner);
+ case DataType.INTEGER:
+ return test(r.nextLong(),inner);
+ case DataType.LONG:
+ return test(r.nextLong(),inner);
+ case DataType.MAP:
+ return test(GenRandomData.genRandMap(r, 10),inner);
+ case DataType.TUPLE:
+ return test(GenRandomData.genRandSmallBagTuple(r, 10, 100),inner);
+ }
+ return false;
+ }
+
+ @Test
+ public void testOperator() throws ExecException, IOException{
+ byte[] types = DataType.genAllTypes();
+// Map<Byte, String> map = operatorHelper.genTypeToNameMap();
+// System.out.println("Testing Package:");
+ for (byte b : types) {
+// System.out.print("\t With " + map.get(b) + ": ");
+ boolean succ = true;
+ int NUM_TRIALS = 10;
+ boolean[] inner1 = { false , false };
+ for (int i = 0; i < NUM_TRIALS; i++)
+ succ &= test(b, inner1);
+ assertEquals(true, succ);
+
+ boolean[] inner2 = { true , false };
+ for (int i = 0; i < NUM_TRIALS; i++)
+ succ &= test(b, inner2);
+ assertEquals(true, succ);
+ /*if (succ)
+ System.out.println("Success!!");
+ else
+ System.out.println("Failure");*/
+ }
+ }
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/GenPhyOp.java Thu Apr 17 14:39:26 2008
@@ -18,23 +18,32 @@
package org.apache.pig.test.utils;
import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Random;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.OperatorKey;
import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POFilter;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+// import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
+// import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
+// import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ConstantExpression;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
-//import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
+// import
+// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
+// import
+// org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.EqualToExpr;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GTOrEqualToExpr;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.binaryExprOps.comparators.GreaterThanExpr;
@@ -44,139 +53,278 @@
public class GenPhyOp {
static Random r = new Random();
- public static ConstantExpression exprConst(){
- ConstantExpression ret = new ConstantExpression(new OperatorKey("",r.nextLong()));
+
+ public static ConstantExpression exprConst() {
+ ConstantExpression ret = new ConstantExpression(new OperatorKey("", r
+ .nextLong()));
return ret;
}
- public static GreaterThanExpr compGreaterThanExpr(){
- GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("",r.nextLong()));
+ public static GreaterThanExpr compGreaterThanExpr() {
+ GreaterThanExpr ret = new GreaterThanExpr(new OperatorKey("", r
+ .nextLong()));
return ret;
}
-
- public static POProject exprProject(){
- POProject ret = new POProject(new OperatorKey("",r.nextLong()));
+
+ public static POProject exprProject() {
+ POProject ret = new POProject(new OperatorKey("", r.nextLong()));
return ret;
}
-
- public static GTOrEqualToExpr compGTOrEqualToExpr(){
- GTOrEqualToExpr ret = new GTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+
+ public static GTOrEqualToExpr compGTOrEqualToExpr() {
+ GTOrEqualToExpr ret = new GTOrEqualToExpr(new OperatorKey("", r
+ .nextLong()));
return ret;
}
-
- public static EqualToExpr compEqualToExpr(){
- EqualToExpr ret = new EqualToExpr(new OperatorKey("",r.nextLong()));
+
+ public static EqualToExpr compEqualToExpr() {
+ EqualToExpr ret = new EqualToExpr(new OperatorKey("", r.nextLong()));
return ret;
}
-
- public static NotEqualToExpr compNotEqualToExpr(){
- NotEqualToExpr ret = new NotEqualToExpr(new OperatorKey("",r.nextLong()));
+
+ public static NotEqualToExpr compNotEqualToExpr() {
+ NotEqualToExpr ret = new NotEqualToExpr(new OperatorKey("", r
+ .nextLong()));
+ return ret;
+ }
+
+ public static LessThanExpr compLessThanExpr() {
+ LessThanExpr ret = new LessThanExpr(new OperatorKey("", r.nextLong()));
+ return ret;
+ }
+
+ public static LTOrEqualToExpr compLTOrEqualToExpr() {
+ LTOrEqualToExpr ret = new LTOrEqualToExpr(new OperatorKey("", r
+ .nextLong()));
+ return ret;
+ }
+
+ public static POLocalRearrange topLocalRearrangeOp() {
+ POLocalRearrange ret = new POLocalRearrange(new OperatorKey("", r
+ .nextLong()));
return ret;
}
- public static LessThanExpr compLessThanExpr(){
- LessThanExpr ret = new LessThanExpr(new OperatorKey("",r.nextLong()));
+ public static POForEach topForEachOp() {
+ POForEach ret = new POForEach(new OperatorKey("", r
+ .nextLong()));
+ return ret;
+ }
+
+ public static POGenerate topGenerateOp() {
+ POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()));
return ret;
}
- public static LTOrEqualToExpr compLTOrEqualToExpr(){
- LTOrEqualToExpr ret = new LTOrEqualToExpr(new OperatorKey("",r.nextLong()));
+ /**
+ * creates the POGenerate operator for
+ * generate grpCol, *.
+ *
+ * @param grpCol - The column to be grouped on
+ * @param sample - The sample tuple that is used to infer
+ * result types and #projects for *
+ * @return - The POGenerate operator which has the exprplan
+ * for generate grpCol, * set.
+ * @throws IOException
+ */
+ public static POGenerate topGenerateOpWithExPlan(int grpCol, Tuple sample) throws IOException {
+ POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, grpCol);
+ prj1.setResultType(sample.getType(grpCol));
+ prj1.setOverloaded(false);
+
+
+
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(false);
+
+
+ ExprPlan plan1 = new ExprPlan();
+ plan1.add(prj1);
+
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ inputs.add(plan1);
+
+ POProject rest[] = new POProject[sample.size()];
+ int i=-1;
+ for (POProject project : rest) {
+ project = new POProject(new OperatorKey("", r.nextLong()), -1, ++i);
+ project.setResultType(sample.getType(i));
+ project.setOverloaded(false);
+
+ ExprPlan pl = new ExprPlan();
+ pl.add(project);
+
+ toBeFlattened.add(false);
+ inputs.add(pl);
+ }
+
+
+
+ POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+ inputs, toBeFlattened);
return ret;
}
-// public static POLocalRearrange topLocalRearrangeOp(){
-// POLocalRearrange ret = new POLocalRearrange(new OperatorKey("",r.nextLong()));
-// return ret;
-// }
-//
-// public static POGenerate topGenerateOp(){
-// POGenerate ret = new POGenerate(new OperatorKey("",r.nextLong()));
-// return ret;
-// }
-//
- public static POLoad topLoadOp(){
- POLoad ret = new POLoad(new OperatorKey("",r.nextLong()));
- return ret;
- }
+ /**
+ * creates the POGenerate operator for
+ * 'generate field'.
+ *
+ * @param field - The column to be generated
+ * @param sample - The sample tuple that is used to infer
+ * result type
+ * @return - The POGenerate operator which has the exprplan
+ * for 'generate field' set.
+ * @throws IOException
+ */
+ public static POGenerate topGenerateOpWithExPlanForFe(int field, Tuple sample) throws IOException {
+ POProject prj1 = new POProject(new OperatorKey("", r.nextLong()), -1, field);
+ prj1.setResultType(sample.getType(field));
+ prj1.setOverloaded(false);
+
+
+
+ List<Boolean> toBeFlattened = new LinkedList<Boolean>();
+ toBeFlattened.add(false);
+
+
+ ExprPlan plan1 = new ExprPlan();
+ plan1.add(prj1);
+
+ List<ExprPlan> inputs = new LinkedList<ExprPlan>();
+ inputs.add(plan1);
+
+ POGenerate ret = new POGenerate(new OperatorKey("", r.nextLong()),
+ inputs, toBeFlattened);
+ return ret;
+ }
- public static POFilter topFilterOp(){
- POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
+ /**
+ * creates the POLocalRearrange operator with the given index for
+ * group by grpCol
+ * @param index - The input index of this POLocalRearrange operator
+ * @param grpCol - The column to be grouped on
+ * @param sample - Sample tuple needed for topGenerateOpWithExPlan
+ * @return - The POLocalRearrange operator
+ * @throws IOException
+ */
+ public static POLocalRearrange topLocalRearrangeOPWithPlan(int index, int grpCol, Tuple sample) throws IOException{
+ POGenerate gen = topGenerateOpWithExPlan(grpCol, sample);
+ PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+ pp.add(gen);
+
+ POLocalRearrange ret = topLocalRearrangeOp();
+ ret.setPlan(pp);
+ ret.setIndex(index);
+ ret.setResultType(DataType.TUPLE);
return ret;
}
- public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal) throws IOException{
- POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
-
+ /**
+ * creates the POForEach operator for
+ * foreach A generate field
+ * @param field - The column to be generated
+ * @param sample - Sample tuple needed for topGenerateOpWithExPlanForFe
+ * @return - The POForEach operator
+ * @throws IOException
+ */
+ public static POForEach topForEachOPWithPlan(int field, Tuple sample) throws IOException{
+ POGenerate gen = topGenerateOpWithExPlanForFe(field, sample);
+ PhysicalPlan<PhysicalOperator> pp = new PhysicalPlan<PhysicalOperator>();
+ pp.add(gen);
+
+ POForEach ret = topForEachOp();
+ ret.setPlan(pp);
+ ret.setResultType(DataType.TUPLE);
+ return ret;
+ }
+
+ public static POLoad topLoadOp() {
+ POLoad ret = new POLoad(new OperatorKey("", r.nextLong()));
+ return ret;
+ }
+
+ public static POFilter topFilterOp() {
+ POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+ return ret;
+ }
+
+ public static POFilter topFilterOpWithExPlan(int lhsVal, int rhsVal)
+ throws IOException {
+ POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
ConstantExpression ce1 = GenPhyOp.exprConst();
ce1.setValue(lhsVal);
-
+
ConstantExpression ce2 = GenPhyOp.exprConst();
ce2.setValue(rhsVal);
-
+
GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
gr.setLhs(ce1);
gr.setRhs(ce2);
gr.setOperandType(DataType.INTEGER);
-
+
ExprPlan ep = new ExprPlan();
ep.add(ce1);
ep.add(ce2);
ep.add(gr);
-
+
ep.connect(ce1, gr);
ep.connect(ce2, gr);
-
+
ret.setPlan(ep);
-
+
return ret;
}
-
- public static POFilter topFilterOpWithProj(int col, int rhsVal) throws IOException{
- POFilter ret = new POFilter(new OperatorKey("",r.nextLong()));
-
+
+ public static POFilter topFilterOpWithProj(int col, int rhsVal)
+ throws IOException {
+ POFilter ret = new POFilter(new OperatorKey("", r.nextLong()));
+
POProject proj = exprProject();
proj.setResultType(DataType.INTEGER);
proj.setColumn(col);
proj.setOverloaded(false);
-
+
ConstantExpression ce2 = GenPhyOp.exprConst();
ce2.setValue(rhsVal);
-
+
GreaterThanExpr gr = GenPhyOp.compGreaterThanExpr();
gr.setLhs(proj);
gr.setRhs(ce2);
gr.setOperandType(DataType.INTEGER);
-
+
ExprPlan ep = new ExprPlan();
ep.add(proj);
ep.add(ce2);
ep.add(gr);
-
+
ep.connect(proj, gr);
ep.connect(ce2, gr);
-
+
ret.setPlan(ep);
-
+
return ret;
}
-//
-// public static POGlobalRearrange topGlobalRearrangeOp(){
-// POGlobalRearrange ret = new POGlobalRearrange(new OperatorKey("",r.nextLong()));
-// return ret;
-// }
-//
-// public static POPackage topPackageOp(){
-// POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
-// return ret;
-// }
-//
- public static POStore topStoreOp(){
- POStore ret = new POStore(new OperatorKey("",r.nextLong()));
- return ret;
- }
-//
-// public static StartMap topStartMapOp(){
-// StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
-// return ret;
-// }
+
+ //
+ // public static POGlobalRearrange topGlobalRearrangeOp(){
+ // POGlobalRearrange ret = new POGlobalRearrange(new
+ // OperatorKey("",r.nextLong()));
+ // return ret;
+ // }
+ //
+ // public static POPackage topPackageOp(){
+ // POPackage ret = new POPackage(new OperatorKey("",r.nextLong()));
+ // return ret;
+ // }
+ //
+ public static POStore topStoreOp() {
+ POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+ return ret;
+ }
+ //
+ // public static StartMap topStartMapOp(){
+ // StartMap ret = new StartMap(new OperatorKey("",r.nextLong()));
+ // return ret;
+ // }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java?rev=649290&r1=649289&r2=649290&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/utils/TestHelper.java Thu Apr 17 14:39:26 2008
@@ -17,9 +17,12 @@
*/
package org.apache.pig.test.utils;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DefaultBagFactory;
+import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
/**
@@ -35,5 +38,37 @@
return true;
}
return false;
+ }
+
+ public static boolean compareBags(DataBag db1, DataBag db2) {
+ if (db1.size() != db2.size())
+ return false;
+
+ boolean equal = true;
+ for (Tuple tuple : db2) {
+ boolean contains = false;
+ for (Tuple tuple2 : db1) {
+ if (tuple.compareTo(tuple2) == 0) {
+ contains = true;
+ break;
+ }
+ }
+ if (!contains) {
+ equal = false;
+ break;
+ }
+ }
+ return equal;
+ }
+
+ public static DataBag projectBag(DataBag db2, int i) throws IOException {
+ DataBag ret = DefaultBagFactory.getInstance().newDefaultBag();
+ for (Tuple tuple : db2) {
+ Object o = tuple.get(i);
+ Tuple t1 = new DefaultTuple();
+ t1.append(o);
+ ret.add(t1);
+ }
+ return ret;
}
}