You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/08 00:17:08 UTC
svn commit: r654303 - in /incubator/pig/branches/types: ./
src/org/apache/pig/ src/org/apache/pig/impl/physicalLayer/plans/
src/org/apache/pig/impl/physicalLayer/topLevelOperators/
test/org/apache/pig/test/
Author: gates
Date: Wed May 7 15:17:03 2008
New Revision: 654303
URL: http://svn.apache.org/viewvc?rev=654303&view=rev
Log:
Shubham's changes to add POUserFunc, PODistinct, POSort.
Added:
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java
incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java
incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Wed May 7 15:17:03 2008
@@ -142,9 +142,10 @@
**/test/TestEqualTo.java,**/test/TestNotEqualTo.java, **/test/TestPOGenerate.java,
**/test/TestProject.java, **/test/TestLoad.java, **/test/TestStore.java,
**/test/FakeFSOutputStream.java, **/test/TestPackage.java, **/test/TestForEach.java,
- **/test/TestLocalRearrange.java,
+ **/test/TestLocalRearrange.java, **/test/TestPOUserFunc.java,
+ **/test/TestPODistinct.java, **/test/TestPOSort.java,
**/test/FakeFSInputStream.java, **/test/Util.java,
- **/logicalLayer/*.java,
+ **/logicalLayer/*.java, **/logicalLayer/parser/NodeIdGenerator.java,
**/logicalLayer/schema/*.java, **/physicalLayer/topLevelOperators/*.java,
**/physicalLayer/topLevelOperators/**/*.java, **/physicalLayer/plans/*.java,
**/physicalLayer/Result.java,
@@ -252,6 +253,9 @@
<include name="**/TestEqualTo.java" />
<include name="**/TestNotEqualTo.java" />
<include name="**/TestPOGenerate.java" />
+ <include name="**/TestPOSort.java" />
+ <include name="**/TestPOUserFunc.java" />
+ <include name="**/TestPODistinct.java" />
<include name="**/TestLoad.java" />
<include name="**/TestStore.java" />
<include name="**/TestPackage.java" />
Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java Wed May 7 15:17:03 2008
@@ -22,11 +22,12 @@
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
public abstract class ComparisonFunc extends WritableComparator {
public ComparisonFunc() {
- super(Tuple.class);
+ super(TupleFactory.getInstance().tupleClass());
}
public int compare(WritableComparable a, WritableComparable b) {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/plans/PhyPlanVisitor.java Wed May 7 15:17:03 2008
@@ -21,11 +21,15 @@
import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGenerate;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.POGlobalRearrange;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POForEach;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLoad;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POLocalRearrange;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POPackage;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
import org.apache.pig.impl.physicalLayer.topLevelOperators.POStore;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
//import org.apache.pig.impl.physicalLayer.topLevelOperators.StartMap;
@@ -101,5 +105,21 @@
//do nothing
}
+ public void visitDistinct(PODistinct distinct) {
+
+ }
+
+ public void visitRead(PORead read) {
+
+ }
+
+ public void visitSort(POSort sort) {
+
+ }
+
+ public void visitUserFunc(POUserFunc userFunc) {
+
+ }
+
}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PODistinct.java Wed May 7 15:17:03 2008
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This is a blocking operator. All the input is put in the hashset implemented
+ * in DistinctDataBag which also provides the other DataBag interfaces.
+ *
+ *
+ */
+public class PODistinct extends PhysicalOperator<PhyPlanVisitor> {
+
+ private boolean inputsAccumulated = false;
+ private DataBag distinctBag = BagFactory.getInstance().newDistinctBag();
+ private final Log log = LogFactory.getLog(getClass());
+ transient Iterator<Tuple> it;
+
+ public PODistinct(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PODistinct(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PODistinct(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PODistinct(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ public boolean isBlocking() {
+ // TODO Auto-generated method stub
+ return true;
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ if (!inputsAccumulated) {
+ Result in = processInput();
+ while (in.returnStatus != POStatus.STATUS_EOP) {
+ if (in.returnStatus == POStatus.STATUS_ERR) {
+ log.error("Error in reading from inputs");
+ continue;
+ } else if (in.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+ distinctBag.add((Tuple) in.result);
+ in = processInput();
+ }
+ inputsAccumulated = true;
+ }
+ if (it == null) {
+ it = distinctBag.iterator();
+ }
+ res.result = it.next();
+ if (res.result == null)
+ res.returnStatus = POStatus.STATUS_EOP;
+ else
+ res.returnStatus = POStatus.STATUS_OK;
+ return res;
+
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "PODistinct - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+ v.visitDistinct(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/PORead.java Wed May 7 15:17:03 2008
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This operator is used to read tuples from a databag in memory. Used mostly
+ * for testing. It'd also be useful for the example generator
+ *
+ */
+public class PORead extends PhysicalOperator<PhyPlanVisitor> {
+
+ DataBag bag;
+ transient Iterator<Tuple> it;
+
+ public PORead(OperatorKey k) {
+ super(k);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+ super(k, rp, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, int rp) {
+ super(k, rp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, List<PhysicalOperator> inp) {
+ super(k, inp);
+ // TODO Auto-generated constructor stub
+ }
+
+ public PORead(OperatorKey k, DataBag bag) {
+ super(k);
+ this.bag = bag;
+ }
+
+ @Override
+ public Result getNext(Tuple t) {
+ if (it == null) {
+ it = bag.iterator();
+ }
+ Result res = new Result();
+ if (it.hasNext()) {
+ res.returnStatus = POStatus.STATUS_OK;
+ res.result = it.next();
+ } else {
+ res.returnStatus = POStatus.STATUS_EOP;
+ }
+ return res;
+ }
+
+ @Override
+ public String name() {
+ // TODO Auto-generated method stub
+ return "PORead - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+ // TODO Auto-generated method stub
+ v.visitRead(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POSort.java Wed May 7 15:17:03 2008
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.ExpressionOperator;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * This implementation is applicable for both the physical plan and for the
+ * local backend, as the conversion of physical to mapreduce would see the SORT
+ * operator and take necessary steps to convert it to a quantile and a sort job.
+ *
+ * This is a blocking operator. The sortedDataBag accumulates Tuples and sorts
+ * them only when there an iterator is started. So all the tuples from the input
+ * operator should be accumulated and filled into the dataBag. The attachInput
+ * method is not applicable here.
+ *
+ *
+ */
+public class POSort extends PhysicalOperator<PhyPlanVisitor> {
+
+ //private List<Integer> mSortCols;
+ private List<ExprPlan> sortPlans;
+ private List<Byte> ExprOutputTypes;
+ private List<Boolean> mAscCols;
+ private POUserComparisonFunc mSortFunc;
+ private final Log log = LogFactory.getLog(getClass());
+
+ private boolean inputsAccumulated = false;
+ public boolean isUDFComparatorUsed = false;
+ private DataBag sortedBag;
+ transient Iterator<Tuple> it;
+
+ public POSort(OperatorKey k, int rp, List inp, List<ExprPlan> sortPlans,
+ List<Boolean> mAscCols, POUserFunc mSortFunc) {
+ super(k, rp, inp);
+ //this.mSortCols = mSortCols;
+ this.sortPlans = sortPlans;
+ this.mAscCols = mAscCols;
+ this.mSortFunc = (POUserComparisonFunc) mSortFunc;
+ if (mSortFunc == null) {
+ sortedBag = BagFactory.getInstance().newSortedBag(
+ new SortComparator());
+ ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
+
+ for(ExprPlan plan : sortPlans) {
+ ExprOutputTypes.add(plan.getLeaves().get(0).resultType);
+ }
+ } else {
+ sortedBag = BagFactory.getInstance().newSortedBag(
+ new UDFSortComparator());
+ isUDFComparatorUsed = true;
+ }
+ }
+
+ public POSort(OperatorKey k, int rp, List inp) {
+ super(k, rp, inp);
+
+ }
+
+ public POSort(OperatorKey k, int rp) {
+ super(k, rp);
+
+ }
+
+ public POSort(OperatorKey k, List inp) {
+ super(k, inp);
+
+ }
+
+ public POSort(OperatorKey k) {
+ super(k);
+
+ }
+
+ public class SortComparator implements Comparator<Tuple> {
+ public int compare(Tuple o1, Tuple o2) {
+ int count = 0;
+ int ret = 0;
+ if(sortPlans == null || sortPlans.size() == 0)
+ return 0;
+ for(ExprPlan plan : sortPlans) {
+ try {
+ plan.attachInput(o1);
+ Result res1 = getResult(plan, ExprOutputTypes.get(count));
+ plan.attachInput(o2);
+ Result res2 = getResult(plan, ExprOutputTypes.get(count));
+ if(res1.returnStatus != POStatus.STATUS_OK || res2.returnStatus != POStatus.STATUS_OK) {
+ log.error("Error processing the input in the expression plan : " + plan.toString());
+ } else {
+ if(mAscCols.get(count ++))
+ ret = DataType.compare(res1.result, res2.result);
+ else
+ ret = DataType.compare(res2.result, res1.result);
+ }
+
+ } catch (ExecException e) {
+ log.error("Invalid result while executing the expression plan : " + plan.toString() + "\n" + e.getMessage());
+ }
+
+ }
+ return ret;
+ }
+
+ private Result getResult(ExprPlan plan, byte resultType) throws ExecException {
+ ExpressionOperator Op = plan.getLeaves().get(0);
+ Result res = null;
+
+ switch (resultType) {
+ case DataType.BYTEARRAY:
+ res = Op.getNext(dummyDBA);
+ break;
+ case DataType.CHARARRAY:
+ res = Op.getNext(dummyString);
+ break;
+ case DataType.DOUBLE:
+ res = Op.getNext(dummyDouble);
+ break;
+ case DataType.FLOAT:
+ res = Op.getNext(dummyFloat);
+ break;
+ case DataType.INTEGER:
+ res = Op.getNext(dummyInt);
+ break;
+ case DataType.LONG:
+ res = Op.getNext(dummyLong);
+ break;
+ }
+ return res;
+ }
+ }
+
+ public class UDFSortComparator implements Comparator<Tuple> {
+
+ public int compare(Tuple t1, Tuple t2) {
+
+ mSortFunc.attachInput(t1, t2);
+ Integer i = null;
+ Result res = null;
+ try {
+ res = mSortFunc.getNext(i);
+ } catch (ExecException e) {
+
+ log.error("Input not ready. Error on reading from input. "
+ + e.getMessage());
+ }
+ if (res != null)
+ return (Integer) res.result;
+ else
+ return 0;
+ }
+
+ }
+
+ @Override
+ public String name() {
+
+ return "POSort - " + mKey.toString();
+ }
+
+ @Override
+ public boolean isBlocking() {
+
+ return true;
+ }
+
+ @Override
+ public Result getNext(Tuple t) throws ExecException {
+ Result res = new Result();
+ if (!inputsAccumulated) {
+ res = processInput();
+ while (res.returnStatus != POStatus.STATUS_EOP) {
+ if (res.returnStatus == POStatus.STATUS_ERR) {
+ log.error("Error in reading from the inputs");
+ continue;
+ } else if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ }
+ sortedBag.add((Tuple) res.result);
+ res = processInput();
+
+ }
+
+ inputsAccumulated = true;
+
+ }
+ if (it == null) {
+ it = sortedBag.iterator();
+ }
+ res.result = it.next();
+ if (res.result == null)
+ res.returnStatus = POStatus.STATUS_EOP;
+ else
+ res.returnStatus = POStatus.STATUS_OK;
+ return res;
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+
+ v.visitSort(this);
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserComparisonFunc.java Wed May 7 15:17:03 2008
@@ -0,0 +1,124 @@
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+
+public class POUserComparisonFunc extends POUserFunc {
+
+ transient ComparisonFunc func;
+ private Log log = LogFactory.getLog(getClass());
+
+ public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec, ComparisonFunc func) {
+ super(k, rp, inp);
+ this.funcSpec = funcSpec;
+ this.func = func;
+ }
+
+ public POUserComparisonFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+ this(k, rp, inp, funcSpec, null);
+
+ instantiateFunc();
+ }
+
+ private void instantiateFunc() {
+ this.func = (ComparisonFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+ }
+
+ public ComparisonFunc getComparator() {
+ if (func == null)
+ instantiateFunc();
+ return func;
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ Result result = new Result();
+
+ if (func == null)
+ instantiateFunc();
+
+
+ result.result = func.compare(t1, t2);
+ result.returnStatus = (t1 != null && t2 != null) ? POStatus.STATUS_OK
+ : POStatus.STATUS_ERR;
+ // the two attached tuples are used up now. So we set the
+ // inputAttached flag to false
+ inputAttached = false;
+ return result;
+
+ }
+
+ private Result getNext() {
+ Result res = null;
+ log.error("getNext being called with non-integer");
+ return res;
+ }
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Tuple in) throws ExecException {
+ return getNext();
+ }
+
+ public void attachInput(Tuple t1, Tuple t2) {
+ if (func == null)
+ instantiateFunc();
+
+ this.t1 = t1;
+ this.t2 = t2;
+ inputAttached = true;
+
+ }
+
+}
Added: incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/physicalLayer/topLevelOperators/POUserFunc.java Wed May 7 15:17:03 2008
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.physicalLayer.topLevelOperators;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class POUserFunc extends PhysicalOperator<PhyPlanVisitor> {
+
+ transient EvalFunc func;
+ Tuple t1, t2;
+ private final Log log = LogFactory.getLog(getClass());
+ String funcSpec;
+ private final byte INITIAL = 0;
+ private final byte INTERMEDIATE = 1;
+ private final byte FINAL = 2;
+
+ public POUserFunc(OperatorKey k, int rp, List inp) {
+ super(k, rp);
+ inputs = inp;
+
+ }
+
+ public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec) {
+ this(k, rp, inp, funcSpec, null);
+
+ instantiateFunc();
+ }
+
+ public POUserFunc(OperatorKey k, int rp, List inp, String funcSpec, EvalFunc func) {
+ super(k, rp, inp);
+ this.funcSpec = funcSpec;
+ this.func = func;
+
+ }
+
+ private void instantiateFunc() {
+ this.func = (EvalFunc) PigContext.instantiateFuncFromSpec(this.funcSpec);
+ }
+
+ private Result getNext() throws ExecException {
+ Tuple t = null;
+ Result result = new Result();
+ // instantiate the function if its null
+ if (func == null)
+ instantiateFunc();
+
+ try {
+ if (inputAttached) {
+ result.result = func.exec(input);
+ result.returnStatus = (result.result != null) ? POStatus.STATUS_OK
+ : POStatus.STATUS_EOP;
+ return result;
+ } else {
+ Result in = inputs.get(0).getNext(t);
+ if (in.returnStatus == POStatus.STATUS_EOP) {
+ result.returnStatus = POStatus.STATUS_EOP;
+ return result;
+ }
+ result.result = func.exec((Tuple) in.result);
+ result.returnStatus = POStatus.STATUS_OK;
+ return result;
+ }
+ } catch (IOException e) {
+ log.error(e);
+ //throw new ExecException(e.getCause());
+ }
+ result.returnStatus = POStatus.STATUS_ERR;
+ return result;
+ }
+
+ @Override
+ public Result getNext(Tuple tIn) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataBag db) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Integer i) throws ExecException {
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Boolean b) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(DataByteArray ba) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Double d) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Float f) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Long l) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(Map m) throws ExecException {
+
+ return getNext();
+ }
+
+ @Override
+ public Result getNext(String s) throws ExecException {
+
+ return getNext();
+ }
+
+ public void setAlgebraicFunction(Byte Function) {
+ // This will only be used by the optimizer for putting correct functions
+ // in the mapper,
+ // combiner and reduce. This helps in maintaining the physical plan as
+ // is without the
+ // optimiser having to replace any operators.
+ // You wouldn't be able to make two calls to this function on the same
+ // algebraic EvalFunc as
+ // func is being changed.
+ switch (Function) {
+ case INITIAL:
+ func = (EvalFunc) PigContext.instantiateFuncFromSpec(getInitial());
+ setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ break;
+ case INTERMEDIATE:
+ func = (EvalFunc) PigContext.instantiateFuncFromSpec(getIntermed());
+ setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ break;
+ case FINAL:
+ func = (EvalFunc) PigContext.instantiateFuncFromSpec(getFinal());
+ setResultType(DataType.findType(((EvalFunc) func).getReturnType()));
+ break;
+
+ }
+ }
+
+ public String getInitial() {
+ if (func == null)
+ instantiateFunc();
+
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getInitial();
+ } else {
+ log
+ .error("Attempt to run a non-algebraic function as an algebraic function");
+ }
+ return null;
+ }
+
+ public String getIntermed() {
+ if (func == null)
+ instantiateFunc();
+
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getIntermed();
+ } else {
+ log
+ .error("Attempt to run a non-algebraic function as an algebraic function");
+ }
+ return null;
+ }
+
+ public String getFinal() {
+ if (func == null)
+ instantiateFunc();
+
+ if (func instanceof Algebraic) {
+ return ((Algebraic) func).getFinal();
+ } else {
+ log
+ .error("Attempt to run a non-algebraic function as an algebraic function");
+ }
+ return null;
+ }
+
+ public Type getReturnType() {
+ if (func == null)
+ instantiateFunc();
+
+ return func.getReturnType();
+ }
+
+ public void finish() {
+ if (func == null)
+ instantiateFunc();
+
+ func.finish();
+ }
+
+ public Schema outputSchema(Schema input) {
+ if (func == null)
+ instantiateFunc();
+
+ return func.outputSchema(input);
+ }
+
+ public Boolean isAsynchronous() {
+ if (func == null)
+ instantiateFunc();
+
+ return func.isAsynchronous();
+ }
+
+ @Override
+ public String name() {
+
+ return "POUserFunc - " + mKey.toString();
+ }
+
+ @Override
+ public boolean supportsMultipleInputs() {
+
+ return false;
+ }
+
+ @Override
+ public boolean supportsMultipleOutputs() {
+
+ return false;
+ }
+
+ @Override
+ public void visit(PhyPlanVisitor v) throws VisitorException {
+
+ v.visitUserFunc(this);
+ }
+
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestConstExpr.java Wed May 7 15:17:03 2008
@@ -34,7 +34,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestConstExpr {
+public class TestConstExpr extends junit.framework.TestCase {
Random r = new Random();
ConstantExpression ce = (ConstantExpression) GenPhyOp.exprConst();
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEqualTo.java Wed May 7 15:17:03 2008
@@ -36,7 +36,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestEqualTo {
+public class TestEqualTo extends junit.framework.TestCase {
@Before
public void setUp() throws Exception {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilter.java Wed May 7 15:17:03 2008
@@ -42,7 +42,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestFilter {
+public class TestFilter extends junit.framework.TestCase {
POFilter pass;
POFilter fail;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestForEach.java Wed May 7 15:17:03 2008
@@ -52,7 +52,7 @@
* as those in the projected bag
*
*/
-public class TestForEach {
+public class TestForEach extends junit.framework.TestCase {
POForEach fe;
Tuple t;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGTOrEqual.java Wed May 7 15:17:03 2008
@@ -37,7 +37,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestGTOrEqual {
+public class TestGTOrEqual extends junit.framework.TestCase {
@Before
public void setUp() throws Exception {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestGreaterThan.java Wed May 7 15:17:03 2008
@@ -37,7 +37,7 @@
import static org.junit.Assert.*;
-public class TestGreaterThan {
+public class TestGreaterThan extends junit.framework.TestCase {
@Before
public void setUp() throws Exception {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLTOrEqual.java Wed May 7 15:17:03 2008
@@ -37,7 +37,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestLTOrEqual {
+public class TestLTOrEqual extends junit.framework.TestCase {
@Before
public void setUp() throws Exception {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLessThan.java Wed May 7 15:17:03 2008
@@ -37,7 +37,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestLessThan {
+public class TestLessThan extends junit.framework.TestCase {
@Before
public void setUp() throws Exception {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLoad.java Wed May 7 15:17:03 2008
@@ -41,7 +41,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestLoad {
+public class TestLoad extends junit.framework.TestCase {
FileSpec inpFSpec;
POLoad ld;
PigContext pc;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLocalRearrange.java Wed May 7 15:17:03 2008
@@ -47,7 +47,7 @@
* group db by $0
*
*/
-public class TestLocalRearrange {
+public class TestLocalRearrange extends junit.framework.TestCase {
POLocalRearrange lr;
Tuple t;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestNotEqualTo.java Wed May 7 15:17:03 2008
@@ -36,7 +36,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestNotEqualTo {
+public class TestNotEqualTo extends junit.framework.TestCase {
@Before
public void setUp() throws Exception {
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java Wed May 7 15:17:03 2008
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PODistinct;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestPODistinct extends TestCase {
+ DataBag input = BagFactory.getInstance().newDefaultBag();
+ Random r = new Random();
+ final int MAX_VALUE = 10;
+ final int MAX_SAMPLES = 100;
+
+ @Before
+ public void setUp() {
+ TupleFactory tf = TupleFactory.getInstance();
+ for (int i = 0; i < MAX_SAMPLES; i++) {
+ Tuple t = tf.newTuple();
+ t.append(r.nextInt(MAX_VALUE));
+ input.add(t);
+ // System.out.println(t);
+ }
+ // System.out.println();
+ }
+
+ @Test
+ public void testPODistict() throws ExecException {
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
+ -1, inputs);
+ Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
+ Tuple t = null;
+ Result res = distinct.getNext(t);
+ t = (Tuple) res.result;
+ while (res.returnStatus != POStatus.STATUS_EOP) {
+ if (output.containsKey(t)) {
+ int i = output.get(t);
+ output.put(t, ++i);
+ } else {
+ output.put(t, 1);
+ }
+ res = distinct.getNext(t);
+ t = (Tuple) res.result;
+ }
+ for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
+ int i = e.getValue();
+ // System.out.println(e.getKey());
+ assertEquals(1, i);
+ }
+ }
+
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Wed May 7 15:17:03 2008
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.plans.ExprPlan;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POSort;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.expressionOperators.POProject;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.Test;
+
+public class TestPOSort extends TestCase {
+ Random r = new Random();
+ int MAX_TUPLES = 10;
+
+ @Test
+ public void testPOSortAscString() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.CHARARRAY);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(true);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(0);
+ Object i2 = ((Tuple) res2.result).get(0);
+ int i = DataType.compare(i1, i2);
+ // System.out.println(res2.result + " i = " + i);
+ assertEquals(true, (i <= 0));
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortDescString() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.CHARARRAY);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(0);
+ Object i2 = ((Tuple) res2.result).get(0);
+ int i = DataType.compare(i1, i2);
+ // System.out.println(res2.result + " i = " + i);
+ assertEquals(true, (i >= 0));
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortAsc() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(true);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(1);
+ Object i2 = ((Tuple) res2.result).get(1);
+ int i = DataType.compare(i1, i2);
+ assertEquals(true, (i <= 0));
+ // System.out.println(res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortDesc() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(1);
+ Object i2 = ((Tuple) res2.result).get(1);
+ int i = DataType.compare(i1, i2);
+ assertEquals(true, (i >= 0));
+ // System.out.println(res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortUDF() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ String funcName = WeirdComparator.class.getName() + "()";
+ /*POUserFunc comparator = new POUserFunc(
+ new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
+ POUserFunc comparator = new POUserComparisonFunc(
+ new OperatorKey("", r.nextLong()), -1, null, funcName);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ null, null, comparator);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ int i1 = (Integer) ((Tuple) res1.result).get(1);
+ int i2 = (Integer) ((Tuple) res2.result).get(1);
+ int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+ assertEquals(true, (i <= 0));
+ System.out.println(i + " : " + res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ // sorts values in ascending order of their distance from 50
+ public static class WeirdComparator extends ComparisonFunc {
+
+ @Override
+ public int compare(Tuple t1, Tuple t2) {
+ // TODO Auto-generated method stub
+ int result = 0;
+ try {
+ int i1 = (Integer) t1.get(1);
+ int i2 = (Integer) t2.get(1);
+ result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return result;
+ }
+
+ }
+}
Added: incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=654303&view=auto
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java (added)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Wed May 7 15:17:03 2008
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.ComparisonFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.OperatorKey;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.physicalLayer.POStatus;
+import org.apache.pig.impl.physicalLayer.Result;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PORead;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserComparisonFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.POUserFunc;
+import org.apache.pig.impl.physicalLayer.topLevelOperators.PhysicalOperator;
+import org.apache.pig.test.utils.GenRandomData;
+import org.junit.Test;
+
+public class TestPOUserFunc extends TestCase {
+ Random r = new Random();
+ int MAX_TUPLES = 10;
+
+ public static class ARITY extends EvalFunc<Integer> {
+
+ @Override
+ public Integer exec(Tuple input) throws IOException {
+ return new Integer(input.size());
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ // TODO FIX
+ // return new AtomSchema("arity");
+ return null;
+ }
+ }
+
+ public static class WeirdComparator extends ComparisonFunc {
+
+ @Override
+ public int compare(Tuple t1, Tuple t2) {
+ // TODO Auto-generated method stub
+ Object o1 = null;
+ Object o2 = null;
+ try {
+ o1 = t1.get(2);
+ o2 = t2.get(2);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ int i1 = (Integer) o1 - 2;
+ int i2 = (Integer) o2 - 2;
+
+ return (int) (i1 * i1 - i2 * i2);
+ }
+
+ }
+
+ /**
+ * Generates the average of the values of the first field of a tuple. This
+ * class is Algebraic in implemenation, so if possible the execution will be
+ * split into a local and global application
+ */
+ public static class AVG extends EvalFunc<Double> implements Algebraic {
+
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ double sum = 0;
+ double count = 0;
+
+ try {
+ sum = sum(input);
+ count = count(input);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+
+ double avg = 0;
+ if (count > 0)
+ avg = sum / count;
+
+ return new Double(avg);
+ }
+
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ public String getIntermed() {
+ return Intermed.class.getName();
+ }
+
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ Tuple t = mTupleFactory.newTuple(2);
+ t.set(0, new Double(sum(input)));
+ t.set(1, new Long(count(input)));
+ return t;
+ } catch (ExecException t) {
+ throw new RuntimeException(t.getMessage() + ": " + input);
+ }
+ }
+ }
+
+ static public class Intermed extends EvalFunc<Tuple> {
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ DataBag b = null;
+ Tuple t = null;
+ try {
+ b = (DataBag) input.get(0);
+ t = combine(b);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return t;
+ }
+ }
+
+ static public class Final extends EvalFunc<Double> {
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ double sum = 0;
+ double count = 0;
+ try {
+ DataBag b = (DataBag) input.get(0);
+ Tuple combined = combine(b);
+
+ sum = (Double) combined.get(0);
+ count = (Long) combined.get(1);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+
+ double avg = 0;
+ if (count > 0) {
+ avg = sum / count;
+ }
+ return new Double(avg);
+ }
+ }
+
+ static protected Tuple combine(DataBag values) throws ExecException {
+ double sum = 0;
+ long count = 0;
+
+ Tuple output = mTupleFactory.newTuple(2);
+
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ sum += (Double) t.get(0);
+ count += (Long) t.get(1);
+ }
+
+ output.set(0, new Double(sum));
+ output.set(1, new Long(count));
+ return output;
+ }
+
+ static protected long count(Tuple input) throws ExecException {
+ DataBag values = (DataBag) input.get(0);
+ return values.size();
+ }
+
+ static protected double sum(Tuple input) throws ExecException {
+ DataBag values = (DataBag) input.get(0);
+
+ double sum = 0;
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ Double d = DataType.toDouble(t.get(0));
+ if (d == null)
+ continue;
+ sum += d;
+ }
+
+ return sum;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ // TODO FIX
+ // return new AtomSchema("average");
+ return null;
+ }
+
+ }
+
+ @Test
+ public void testUserFuncArity() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ String funcSpec = ARITY.class.getName() + "()";
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()),
+ -1, inputs, funcSpec);
+ Result res = new Result();
+ Integer i = null;
+ res = userFunc.getNext(i);
+ while (res.returnStatus != POStatus.STATUS_EOP) {
+ // System.out.println(res.result);
+ int result = (Integer) res.result;
+ assertEquals(2, result);
+ res = userFunc.getNext(i);
+ }
+ }
+
+ @Test
+ public void testUDFCompare() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
+ 100);
+ String funcSpec = WeirdComparator.class.getName() + "()";
+ POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()),
+ -1, null, funcSpec);
+ Iterator<Tuple> it = input.iterator();
+ Tuple t1 = it.next();
+ Tuple t2 = it.next();
+ t1.append(2);
+ t2.append(3);
+ ((POUserComparisonFunc)userFunc).attachInput(t1, t2);
+ Integer i = null;
+ // System.out.println(t1 + " " + t2);
+ int result = (Integer) (userFunc.getNext(i).result);
+ assertEquals(-1, result);
+ }
+
+ @Test
+ public void testAlgebraicAVG() throws IOException, ExecException {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+ byte INIT = 0;
+ byte INTERMED = 1;
+ byte FINAL = 2;
+ Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+ input);
+ Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+ input);
+ // System.out.println("Input = " + tup1);
+ String funcSpec = AVG.class.getName() + "()";
+
+ POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
+ null, funcSpec);
+
+ TupleFactory tf = TupleFactory.getInstance();
+
+ po.setAlgebraicFunction(INIT);
+ po.attachInput(tup1);
+ Tuple t = null;
+ Result res = po.getNext(t);
+ Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+ : null;
+ Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+ : null;
+ System.out.println(outputInitial1 + " " + outputInitial2);
+ assertEquals(outputInitial1, outputInitial2);
+ double sum = (Double) outputInitial1.get(0);
+ long count = (Long) outputInitial1.get(1);
+ assertEquals(55.0, sum);
+ assertEquals(10, count);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ bag.add(outputInitial1);
+ bag.add(outputInitial2);
+ Tuple outputInitial = tf.newTuple();
+ outputInitial.append(bag);
+ // Tuple outputIntermed = intermed.exec(outputInitial);
+ po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+ funcSpec);
+ po.setAlgebraicFunction(INTERMED);
+ po.attachInput(outputInitial);
+ res = po.getNext(t);
+ Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+ : null;
+
+ sum = (Double) outputIntermed.get(0);
+ count = (Long) outputIntermed.get(1);
+ assertEquals(110.0, sum);
+ assertEquals(20, count);
+ System.out.println(outputIntermed);
+ po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+ funcSpec);
+ po.setAlgebraicFunction(FINAL);
+ po.attachInput(outputInitial);
+ res = po.getNext(t);
+ Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result
+ : null;
+ // Double output = fin.exec(outputInitial);
+ assertEquals(5.5, output);
+ // System.out.println("output = " + output);
+
+ }
+}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPackage.java Wed May 7 15:17:03 2008
@@ -39,7 +39,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestPackage {
+public class TestPackage extends junit.framework.TestCase {
@Before
public void setUp() throws Exception {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPhyOp.java Wed May 7 15:17:03 2008
@@ -37,7 +37,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestPhyOp {
+public class TestPhyOp extends junit.framework.TestCase {
PhysicalOperator<PhyPlanVisitor> op;
PhysicalOperator<PhyPlanVisitor> inpOp;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestProject.java Wed May 7 15:17:03 2008
@@ -37,7 +37,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestProject {
+public class TestProject extends junit.framework.TestCase {
Random r;
Tuple t;
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java?rev=654303&r1=654302&r2=654303&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestStore.java Wed May 7 15:17:03 2008
@@ -47,7 +47,7 @@
import org.junit.Before;
import org.junit.Test;
-public class TestStore {
+public class TestStore extends junit.framework.TestCase {
POStore st;
FileSpec fSpec;
DataBag inpDB;