You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/14 00:52:03 UTC
svn commit: r656043 [3/3] - in /incubator/pig/branches/types:
lib-src/bzip2/org/apache/tools/bzip2r/ lib-src/shock/org/apache/pig/shock/
src/org/apache/pig/builtin/ src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/validators/ ...
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/optimizer/Transformer.java Tue May 13 15:52:02 2008
@@ -38,26 +38,26 @@
super(plan, walker);
}
- /**
- * check if the transform should be done. If this is being called then
- * the pattern matches, but there may be other criteria that must be met
- * as well.
- * @param nodes - List of nodes declared in transform ($1 = nodes[0],
- * etc.) Remember that somes entries in node[] may be NULL since they may
- * not be created until after the transform.
- * @returns - true if the transform should be done.
- */
- public abstract boolean check(List<O> nodes);
+ /**
+ * check if the transform should be done. If this is being called then
+ * the pattern matches, but there may be other criteria that must be met
+ * as well.
+ * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+ * etc.) Remember that somes entries in node[] may be NULL since they may
+ * not be created until after the transform.
+ * @returns - true if the transform should be done.
+ */
+ public abstract boolean check(List<O> nodes);
- /**
- * Transform the tree
- * @param nodes - List of nodes declared in transform ($1 = nodes[0],
- * etc.) This call must destruct any nodes that are being removed as part
- * of the transform and remove them from the nodes vector and construct
- * any that are being created as part of the transform and add them at the
- * appropriate point to the nodes vector.
- */
- public abstract void transform(List<O> nodes);
+ /**
+ * Transform the tree
+ * @param nodes - List of nodes declared in transform ($1 = nodes[0],
+ * etc.) This call must destruct any nodes that are being removed as part
+ * of the transform and remove them from the nodes vector and construct
+ * any that are being created as part of the transform and add them at the
+ * appropriate point to the nodes vector.
+ */
+ public abstract void transform(List<O> nodes);
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue May 13 15:52:02 2008
@@ -694,12 +694,12 @@
}
}
- @Test
+ @Test
public void testQueryFail17(){
buildPlan("a = load 'a' as (url, host, rank);");
buildPlan("b = group a by url; ");
try {
- LogicalPlan lp = buildPlan("c = foreach b generate group.url;");
+ LogicalPlan lp = buildPlan("c = foreach b generate group.url;");
} catch (AssertionFailedError e) {
assertTrue(e.getMessage().contains("Exception"));
}
@@ -713,7 +713,7 @@
//buildPlan("b = group a by 2*3;");
//String query = "d = foreach b generate group;";
buildPlan(query);
- buildPlan("e = foreach a generate name, details;");
+ buildPlan("e = foreach a generate name, details;");
}
@Test
@@ -723,15 +723,15 @@
buildPlan("b = group a by details;");
String query = "d = foreach b generate group.age;";
buildPlan(query);
- buildPlan("e = foreach a generate name, details;");
- buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray), bag_of_unknown: bag{}});");
+ buildPlan("e = foreach a generate name, details;");
+ buildPlan("f = LOAD 'myfile' AS (garage: bag{num_tools: integer}, links: bag{websites: chararray}, page: bag{something_stupid: tuple(yeah_double: double)}, coordinates: bag{another_tuple: tuple(ok_float: float, bite_the_array: bytearray), bag_of_unknown: bag{}});");
}
@Test
public void testQueryFail18() {
String query = "foreach (load 'myfile' as (col1, col2 : (sub1, sub2), col3 : (bag1))) generate col1 ;";
try {
- buildPlan(query);
+ buildPlan(query);
} catch (AssertionFailedError e) {
assertTrue(e.getMessage().contains("Exception"));
}
@@ -744,7 +744,7 @@
String query = "c = cross a,b;";
buildPlan(query);
try {
- buildPlan("d = order c by name, b::name, height, a::gpa;");
+ buildPlan("d = order c by name, b::name, height, a::gpa;");
} catch (AssertionFailedError e) {
assertTrue(e.getMessage().contains("Exception"));
}
@@ -783,7 +783,7 @@
//Just the top level roots and their children
//Need a recursive one to travel down the tree
- /*
+ /*
for(LogicalOperator op: lp.getRoots()) {
System.err.println("Logical Plan Root: " + op.getClass().getName() + " object " + op);
@@ -797,7 +797,7 @@
}
}
}
- */
+ */
assertTrue(lp != null);
return lp;
} catch (IOException e) {
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOBinCond.java Tue May 13 15:52:02 2008
@@ -40,68 +40,68 @@
import junit.framework.TestCase;
public class TestPOBinCond extends TestCase {
- Random r = new Random();
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- final int MAX = 10;
-
- @Before
- @Override
- public void setUp() {
- for(int i = 0; i < 10; i ++) {
- Tuple t = TupleFactory.getInstance().newTuple();
- t.append(r.nextInt(2));
- t.append(0);
- t.append(1);
- bag.add(t);
- }
- }
-
- public void testPOBinCond() throws ExecException, PlanException {
- ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst();
- rt.setValue(1);
- rt.setResultType(DataType.INTEGER);
-
- POProject prj1 = GenPhyOp.exprProject();
- prj1.setColumn(0);
- prj1.setResultType(DataType.INTEGER);
-
- EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr();
- equal.setLhs(prj1);
- equal.setRhs(rt);
-
- POProject prjLhs = GenPhyOp.exprProject();
- prjLhs.setResultType(DataType.INTEGER);
- prjLhs.setColumn(1);
-
- POProject prjRhs = GenPhyOp.exprProject();
- prjRhs.setResultType(DataType.INTEGER);
- prjRhs.setColumn(2);
-
- POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs);
- op.setResultType(DataType.INTEGER);
-
- ExprPlan plan = new ExprPlan();
- plan.add(op);
- plan.add(prjLhs);
- plan.add(prjRhs);
- plan.add(equal);
- plan.connect(equal, op);
- plan.connect(prjLhs, op);
- plan.connect(prjRhs, op);
-
- plan.add(prj1);
- plan.add(rt);
- plan.connect(prj1, equal);
- plan.connect(rt, equal);
-
- for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan.attachInput(t);
- Integer i = (Integer) t.get(0);
- assertEquals(1, i | (Integer)op.getNext(i).result);
-// System.out.println(t + " " + op.getNext(i).result.toString());
- }
-
-
- }
+ Random r = new Random();
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ final int MAX = 10;
+
+ @Before
+ @Override
+ public void setUp() {
+ for(int i = 0; i < 10; i ++) {
+ Tuple t = TupleFactory.getInstance().newTuple();
+ t.append(r.nextInt(2));
+ t.append(0);
+ t.append(1);
+ bag.add(t);
+ }
+ }
+
+ public void testPOBinCond() throws ExecException, PlanException {
+ ConstantExpression rt = (ConstantExpression) GenPhyOp.exprConst();
+ rt.setValue(1);
+ rt.setResultType(DataType.INTEGER);
+
+ POProject prj1 = GenPhyOp.exprProject();
+ prj1.setColumn(0);
+ prj1.setResultType(DataType.INTEGER);
+
+ EqualToExpr equal = (EqualToExpr) GenPhyOp.compEqualToExpr();
+ equal.setLhs(prj1);
+ equal.setRhs(rt);
+
+ POProject prjLhs = GenPhyOp.exprProject();
+ prjLhs.setResultType(DataType.INTEGER);
+ prjLhs.setColumn(1);
+
+ POProject prjRhs = GenPhyOp.exprProject();
+ prjRhs.setResultType(DataType.INTEGER);
+ prjRhs.setColumn(2);
+
+ POBinCond op = new POBinCond(new OperatorKey("", r.nextLong()), -1, equal, prjLhs, prjRhs);
+ op.setResultType(DataType.INTEGER);
+
+ ExprPlan plan = new ExprPlan();
+ plan.add(op);
+ plan.add(prjLhs);
+ plan.add(prjRhs);
+ plan.add(equal);
+ plan.connect(equal, op);
+ plan.connect(prjLhs, op);
+ plan.connect(prjRhs, op);
+
+ plan.add(prj1);
+ plan.add(rt);
+ plan.connect(prj1, equal);
+ plan.connect(rt, equal);
+
+ for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan.attachInput(t);
+ Integer i = (Integer) t.get(0);
+ assertEquals(1, i | (Integer)op.getNext(i).result);
+// System.out.println(t + " " + op.getNext(i).result.toString());
+ }
+
+
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPODistinct.java Tue May 13 15:52:02 2008
@@ -41,49 +41,49 @@
import org.junit.Test;
public class TestPODistinct extends TestCase {
- DataBag input = BagFactory.getInstance().newDefaultBag();
- Random r = new Random();
- final int MAX_VALUE = 10;
- final int MAX_SAMPLES = 100;
+ DataBag input = BagFactory.getInstance().newDefaultBag();
+ Random r = new Random();
+ final int MAX_VALUE = 10;
+ final int MAX_SAMPLES = 100;
- @Before
- public void setUp() {
- TupleFactory tf = TupleFactory.getInstance();
- for (int i = 0; i < MAX_SAMPLES; i++) {
- Tuple t = tf.newTuple();
- t.append(r.nextInt(MAX_VALUE));
- input.add(t);
- // System.out.println(t);
- }
- // System.out.println();
- }
+ @Before
+ public void setUp() {
+ TupleFactory tf = TupleFactory.getInstance();
+ for (int i = 0; i < MAX_SAMPLES; i++) {
+ Tuple t = tf.newTuple();
+ t.append(r.nextInt(MAX_VALUE));
+ input.add(t);
+ // System.out.println(t);
+ }
+ // System.out.println();
+ }
- @Test
- public void testPODistict() throws ExecException {
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
- -1, inputs);
- Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
- Tuple t = null;
- Result res = distinct.getNext(t);
- t = (Tuple) res.result;
- while (res.returnStatus != POStatus.STATUS_EOP) {
- if (output.containsKey(t)) {
- int i = output.get(t);
- output.put(t, ++i);
- } else {
- output.put(t, 1);
- }
- res = distinct.getNext(t);
- t = (Tuple) res.result;
- }
- for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
- int i = e.getValue();
- // System.out.println(e.getKey());
- assertEquals(1, i);
- }
- }
+ @Test
+ public void testPODistict() throws ExecException {
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ PODistinct distinct = new PODistinct(new OperatorKey("", r.nextLong()),
+ -1, inputs);
+ Map<Tuple, Integer> output = new HashMap<Tuple, Integer>();
+ Tuple t = null;
+ Result res = distinct.getNext(t);
+ t = (Tuple) res.result;
+ while (res.returnStatus != POStatus.STATUS_EOP) {
+ if (output.containsKey(t)) {
+ int i = output.get(t);
+ output.put(t, ++i);
+ } else {
+ output.put(t, 1);
+ }
+ res = distinct.getNext(t);
+ t = (Tuple) res.result;
+ }
+ for (Map.Entry<Tuple, Integer> e : output.entrySet()) {
+ int i = e.getValue();
+ // System.out.println(e.getKey());
+ assertEquals(1, i);
+ }
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPONegative.java Tue May 13 15:52:02 2008
@@ -36,113 +36,113 @@
import junit.framework.TestCase;
public class TestPONegative extends TestCase {
-
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- Random r = new Random();
- TupleFactory tf = TupleFactory.getInstance();
- final int MAX = 10;
-
- public void testPONegInt () throws PlanException, ExecException {
- for(int i = 0; i < MAX; i++) {
- Tuple t = tf.newTuple();
- t.append(r.nextInt());
- bag.add(t);
- }
-
- POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- prj.setResultType(DataType.INTEGER);
- PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
- pn.setResultType(DataType.INTEGER);
-
- ExprPlan plan = new ExprPlan();
- plan.add(prj); plan.add(pn);
- plan.connect(prj, pn);
-
- for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan.attachInput(t);
- Integer expected = -(Integer)t.get(0);
- int output = (Integer) pn.getNext(expected).result;
- assertEquals(expected.intValue(), output);
- }
-
- }
-
- public void testPONegLong () throws PlanException, ExecException {
- for(int i = 0; i < MAX; i++) {
- Tuple t = tf.newTuple();
- t.append(r.nextLong());
- bag.add(t);
- }
-
- POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- prj.setResultType(DataType.LONG);
- PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
- pn.setResultType(DataType.LONG);
-
- ExprPlan plan = new ExprPlan();
- plan.add(prj); plan.add(pn);
- plan.connect(prj, pn);
-
- for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan.attachInput(t);
- Long expected = -(Long)t.get(0);
- long output = (Long) pn.getNext(expected).result;
- assertEquals(expected.longValue(), output);
- }
-
- }
-
- public void testPONegDouble() throws PlanException, ExecException {
- for(int i = 0; i < MAX; i++) {
- Tuple t = tf.newTuple();
- t.append(r.nextDouble());
- bag.add(t);
- }
-
- POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- prj.setResultType(DataType.DOUBLE);
- PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
- pn.setResultType(DataType.DOUBLE);
-
- ExprPlan plan = new ExprPlan();
- plan.add(prj); plan.add(pn);
- plan.connect(prj, pn);
-
- for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan.attachInput(t);
- Double expected = -(Double)t.get(0);
- double output = (Double) pn.getNext(expected).result;
- assertEquals(expected.doubleValue(), output);
- }
-
- }
-
- public void testPONegFloat() throws PlanException, ExecException {
- for(int i = 0; i < MAX; i++) {
- Tuple t = tf.newTuple();
- t.append(r.nextFloat());
- bag.add(t);
- }
-
- POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- prj.setResultType(DataType.FLOAT);
- PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
- pn.setResultType(DataType.FLOAT);
-
- ExprPlan plan = new ExprPlan();
- plan.add(prj); plan.add(pn);
- plan.connect(prj, pn);
-
- for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
- Tuple t = it.next();
- plan.attachInput(t);
- Float expected = -(Float)t.get(0);
- float output = (Float) pn.getNext(expected).result;
- assertEquals(expected.floatValue(), output);
- }
-
- }
+
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ Random r = new Random();
+ TupleFactory tf = TupleFactory.getInstance();
+ final int MAX = 10;
+
+ public void testPONegInt () throws PlanException, ExecException {
+ for(int i = 0; i < MAX; i++) {
+ Tuple t = tf.newTuple();
+ t.append(r.nextInt());
+ bag.add(t);
+ }
+
+ POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ prj.setResultType(DataType.INTEGER);
+ PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+ pn.setResultType(DataType.INTEGER);
+
+ ExprPlan plan = new ExprPlan();
+ plan.add(prj); plan.add(pn);
+ plan.connect(prj, pn);
+
+ for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan.attachInput(t);
+ Integer expected = -(Integer)t.get(0);
+ int output = (Integer) pn.getNext(expected).result;
+ assertEquals(expected.intValue(), output);
+ }
+
+ }
+
+ public void testPONegLong () throws PlanException, ExecException {
+ for(int i = 0; i < MAX; i++) {
+ Tuple t = tf.newTuple();
+ t.append(r.nextLong());
+ bag.add(t);
+ }
+
+ POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ prj.setResultType(DataType.LONG);
+ PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+ pn.setResultType(DataType.LONG);
+
+ ExprPlan plan = new ExprPlan();
+ plan.add(prj); plan.add(pn);
+ plan.connect(prj, pn);
+
+ for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan.attachInput(t);
+ Long expected = -(Long)t.get(0);
+ long output = (Long) pn.getNext(expected).result;
+ assertEquals(expected.longValue(), output);
+ }
+
+ }
+
+ public void testPONegDouble() throws PlanException, ExecException {
+ for(int i = 0; i < MAX; i++) {
+ Tuple t = tf.newTuple();
+ t.append(r.nextDouble());
+ bag.add(t);
+ }
+
+ POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ prj.setResultType(DataType.DOUBLE);
+ PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+ pn.setResultType(DataType.DOUBLE);
+
+ ExprPlan plan = new ExprPlan();
+ plan.add(prj); plan.add(pn);
+ plan.connect(prj, pn);
+
+ for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan.attachInput(t);
+ Double expected = -(Double)t.get(0);
+ double output = (Double) pn.getNext(expected).result;
+ assertEquals(expected.doubleValue(), output);
+ }
+
+ }
+
+ public void testPONegFloat() throws PlanException, ExecException {
+ for(int i = 0; i < MAX; i++) {
+ Tuple t = tf.newTuple();
+ t.append(r.nextFloat());
+ bag.add(t);
+ }
+
+ POProject prj = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ prj.setResultType(DataType.FLOAT);
+ PONegative pn = new PONegative(new OperatorKey("", r.nextLong()), -1, prj);
+ pn.setResultType(DataType.FLOAT);
+
+ ExprPlan plan = new ExprPlan();
+ plan.add(prj); plan.add(pn);
+ plan.connect(prj, pn);
+
+ for(Iterator<Tuple> it = bag.iterator(); it.hasNext(); ) {
+ Tuple t = it.next();
+ plan.attachInput(t);
+ Float expected = -(Float)t.get(0);
+ float output = (Float) pn.getNext(expected).result;
+ assertEquals(expected.floatValue(), output);
+ }
+
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOSort.java Tue May 13 15:52:02 2008
@@ -42,183 +42,183 @@
import org.junit.Test;
public class TestPOSort extends TestCase {
- Random r = new Random();
- int MAX_TUPLES = 10;
+ Random r = new Random();
+ int MAX_TUPLES = 10;
- @Test
- public void testPOSortAscString() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- pr1.setResultType(DataType.CHARARRAY);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(true);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(0);
- Object i2 = ((Tuple) res2.result).get(0);
- int i = DataType.compare(i1, i2);
- // System.out.println(res2.result + " i = " + i);
- assertEquals(true, (i <= 0));
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortDescString() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
- pr1.setResultType(DataType.CHARARRAY);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(false);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(0);
- Object i2 = ((Tuple) res2.result).get(0);
- int i = DataType.compare(i1, i2);
- // System.out.println(res2.result + " i = " + i);
- assertEquals(true, (i >= 0));
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortAsc() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
- pr1.setResultType(DataType.INTEGER);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(true);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(1);
- Object i2 = ((Tuple) res2.result).get(1);
- int i = DataType.compare(i1, i2);
- assertEquals(true, (i <= 0));
- // System.out.println(res2.result);
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortDesc() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
- POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
- pr1.setResultType(DataType.INTEGER);
- ExprPlan expPlan = new ExprPlan();
- expPlan.add(pr1);
- sortPlans.add(expPlan);
- List<Boolean> mAscCols = new LinkedList<Boolean>();
- mAscCols.add(false);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- sortPlans, mAscCols, null);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- Object i1 = ((Tuple) res1.result).get(1);
- Object i2 = ((Tuple) res2.result).get(1);
- int i = DataType.compare(i1, i2);
- assertEquals(true, (i >= 0));
- // System.out.println(res2.result);
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- @Test
- public void testPOSortUDF() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- String funcName = WeirdComparator.class.getName() + "()";
- /*POUserFunc comparator = new POUserFunc(
- new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
- POUserFunc comparator = new POUserComparisonFunc(
- new OperatorKey("", r.nextLong()), -1, null, funcName);
- POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
- null, null, comparator);
- Tuple t = null;
- Result res1 = sort.getNext(t);
- // System.out.println(res1.result);
- Result res2 = sort.getNext(t);
- while (res2.returnStatus != POStatus.STATUS_EOP) {
- int i1 = (Integer) ((Tuple) res1.result).get(1);
- int i2 = (Integer) ((Tuple) res2.result).get(1);
- int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
- assertEquals(true, (i <= 0));
- System.out.println(i + " : " + res2.result);
- res1 = res2;
- res2 = sort.getNext(t);
- }
- }
-
- // sorts values in ascending order of their distance from 50
- public static class WeirdComparator extends ComparisonFunc {
-
- @Override
- public int compare(Tuple t1, Tuple t2) {
- // TODO Auto-generated method stub
- int result = 0;
- try {
- int i1 = (Integer) t1.get(1);
- int i2 = (Integer) t2.get(1);
- result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return result;
- }
+ @Test
+ public void testPOSortAscString() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.CHARARRAY);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(true);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(0);
+ Object i2 = ((Tuple) res2.result).get(0);
+ int i = DataType.compare(i1, i2);
+ // System.out.println(res2.result + " i = " + i);
+ assertEquals(true, (i <= 0));
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortDescString() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 0);
+ pr1.setResultType(DataType.CHARARRAY);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(0);
+ Object i2 = ((Tuple) res2.result).get(0);
+ int i = DataType.compare(i1, i2);
+ // System.out.println(res2.result + " i = " + i);
+ assertEquals(true, (i >= 0));
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortAsc() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(true);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(1);
+ Object i2 = ((Tuple) res2.result).get(1);
+ int i = DataType.compare(i1, i2);
+ assertEquals(true, (i <= 0));
+ // System.out.println(res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortDesc() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ List<ExprPlan> sortPlans = new LinkedList<ExprPlan>();
+ POProject pr1 = new POProject(new OperatorKey("", r.nextLong()), -1, 1);
+ pr1.setResultType(DataType.INTEGER);
+ ExprPlan expPlan = new ExprPlan();
+ expPlan.add(pr1);
+ sortPlans.add(expPlan);
+ List<Boolean> mAscCols = new LinkedList<Boolean>();
+ mAscCols.add(false);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ sortPlans, mAscCols, null);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ Object i1 = ((Tuple) res1.result).get(1);
+ Object i2 = ((Tuple) res2.result).get(1);
+ int i = DataType.compare(i1, i2);
+ assertEquals(true, (i >= 0));
+ // System.out.println(res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ @Test
+ public void testPOSortUDF() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ String funcName = WeirdComparator.class.getName() + "()";
+ /*POUserFunc comparator = new POUserFunc(
+ new OperatorKey("", r.nextLong()), -1, inputs, funcName);*/
+ POUserFunc comparator = new POUserComparisonFunc(
+ new OperatorKey("", r.nextLong()), -1, null, funcName);
+ POSort sort = new POSort(new OperatorKey("", r.nextLong()), -1, inputs,
+ null, null, comparator);
+ Tuple t = null;
+ Result res1 = sort.getNext(t);
+ // System.out.println(res1.result);
+ Result res2 = sort.getNext(t);
+ while (res2.returnStatus != POStatus.STATUS_EOP) {
+ int i1 = (Integer) ((Tuple) res1.result).get(1);
+ int i2 = (Integer) ((Tuple) res2.result).get(1);
+ int i = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+ assertEquals(true, (i <= 0));
+ System.out.println(i + " : " + res2.result);
+ res1 = res2;
+ res2 = sort.getNext(t);
+ }
+ }
+
+ // sorts values in ascending order of their distance from 50
+ public static class WeirdComparator extends ComparisonFunc {
+
+ @Override
+ public int compare(Tuple t1, Tuple t2) {
+ // TODO Auto-generated method stub
+ int result = 0;
+ try {
+ int i1 = (Integer) t1.get(1);
+ int i2 = (Integer) t2.get(1);
+ result = (i1 - 50) * (i1 - 50) - (i2 - 50) * (i2 - 50);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return result;
+ }
- }
+ }
}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java?rev=656043&r1=656042&r2=656043&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestPOUserFunc.java Tue May 13 15:52:02 2008
@@ -47,286 +47,286 @@
import org.junit.Test;
public class TestPOUserFunc extends TestCase {
- Random r = new Random();
- int MAX_TUPLES = 10;
+ Random r = new Random();
+ int MAX_TUPLES = 10;
- public static class ARITY extends EvalFunc<Integer> {
+ public static class ARITY extends EvalFunc<Integer> {
- @Override
- public Integer exec(Tuple input) throws IOException {
- return new Integer(input.size());
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("arity");
- return null;
- }
- }
-
- public static class WeirdComparator extends ComparisonFunc {
-
- @Override
- public int compare(Tuple t1, Tuple t2) {
- // TODO Auto-generated method stub
- Object o1 = null;
- Object o2 = null;
- try {
- o1 = t1.get(2);
- o2 = t2.get(2);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- int i1 = (Integer) o1 - 2;
- int i2 = (Integer) o2 - 2;
-
- return (int) (i1 * i1 - i2 * i2);
- }
-
- }
-
- /**
- * Generates the average of the values of the first field of a tuple. This
- * class is Algebraic in implemenation, so if possible the execution will be
- * split into a local and global application
- */
- public static class AVG extends EvalFunc<Double> implements Algebraic {
-
- private static TupleFactory mTupleFactory = TupleFactory.getInstance();
-
- @Override
- public Double exec(Tuple input) throws IOException {
- double sum = 0;
- double count = 0;
-
- try {
- sum = sum(input);
- count = count(input);
- } catch (ExecException e) {
- e.printStackTrace();
- }
-
- double avg = 0;
- if (count > 0)
- avg = sum / count;
-
- return new Double(avg);
- }
-
- public String getInitial() {
- return Initial.class.getName();
- }
-
- public String getIntermed() {
- return Intermed.class.getName();
- }
-
- public String getFinal() {
- return Final.class.getName();
- }
-
- static public class Initial extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- try {
- Tuple t = mTupleFactory.newTuple(2);
- t.set(0, new Double(sum(input)));
- t.set(1, new Long(count(input)));
- return t;
- } catch (ExecException t) {
- throw new RuntimeException(t.getMessage() + ": " + input);
- }
- }
- }
-
- static public class Intermed extends EvalFunc<Tuple> {
- @Override
- public Tuple exec(Tuple input) throws IOException {
- DataBag b = null;
- Tuple t = null;
- try {
- b = (DataBag) input.get(0);
- t = combine(b);
- } catch (ExecException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- return t;
- }
- }
-
- static public class Final extends EvalFunc<Double> {
- @Override
- public Double exec(Tuple input) throws IOException {
- double sum = 0;
- double count = 0;
- try {
- DataBag b = (DataBag) input.get(0);
- Tuple combined = combine(b);
-
- sum = (Double) combined.get(0);
- count = (Long) combined.get(1);
- } catch (ExecException e) {
- e.printStackTrace();
- }
-
- double avg = 0;
- if (count > 0) {
- avg = sum / count;
- }
- return new Double(avg);
- }
- }
-
- static protected Tuple combine(DataBag values) throws ExecException {
- double sum = 0;
- long count = 0;
-
- Tuple output = mTupleFactory.newTuple(2);
-
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- sum += (Double) t.get(0);
- count += (Long) t.get(1);
- }
-
- output.set(0, new Double(sum));
- output.set(1, new Long(count));
- return output;
- }
-
- static protected long count(Tuple input) throws ExecException {
- DataBag values = (DataBag) input.get(0);
- return values.size();
- }
-
- static protected double sum(Tuple input) throws ExecException {
- DataBag values = (DataBag) input.get(0);
-
- double sum = 0;
- for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
- Tuple t = it.next();
- Double d = DataType.toDouble(t.get(0));
- if (d == null)
- continue;
- sum += d;
- }
-
- return sum;
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- // TODO FIX
- // return new AtomSchema("average");
- return null;
- }
-
- }
-
- @Test
- public void testUserFuncArity() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
- MAX_TUPLES, 100);
- String funcSpec = ARITY.class.getName() + "()";
- PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
- List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
- inputs.add(read);
- POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()),
- -1, inputs, funcSpec);
- Result res = new Result();
- Integer i = null;
- res = userFunc.getNext(i);
- while (res.returnStatus != POStatus.STATUS_EOP) {
- // System.out.println(res.result);
- int result = (Integer) res.result;
- assertEquals(2, result);
- res = userFunc.getNext(i);
- }
- }
-
- @Test
- public void testUDFCompare() throws ExecException {
- DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
- 100);
- String funcSpec = WeirdComparator.class.getName() + "()";
- POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()),
- -1, null, funcSpec);
- Iterator<Tuple> it = input.iterator();
- Tuple t1 = it.next();
- Tuple t2 = it.next();
- t1.append(2);
- t2.append(3);
- ((POUserComparisonFunc)userFunc).attachInput(t1, t2);
- Integer i = null;
- // System.out.println(t1 + " " + t2);
- int result = (Integer) (userFunc.getNext(i).result);
- assertEquals(-1, result);
- }
-
- @Test
- public void testAlgebraicAVG() throws IOException, ExecException {
- int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
- byte INIT = 0;
- byte INTERMED = 1;
- byte FINAL = 2;
- Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
- input);
- Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
- input);
- // System.out.println("Input = " + tup1);
- String funcSpec = AVG.class.getName() + "()";
-
- POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
- null, funcSpec);
-
- TupleFactory tf = TupleFactory.getInstance();
-
- po.setAlgebraicFunction(INIT);
- po.attachInput(tup1);
- Tuple t = null;
- Result res = po.getNext(t);
- Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
- : null;
- Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
- : null;
- System.out.println(outputInitial1 + " " + outputInitial2);
- assertEquals(outputInitial1, outputInitial2);
- double sum = (Double) outputInitial1.get(0);
- long count = (Long) outputInitial1.get(1);
- assertEquals(55.0, sum);
- assertEquals(10, count);
- DataBag bag = BagFactory.getInstance().newDefaultBag();
- bag.add(outputInitial1);
- bag.add(outputInitial2);
- Tuple outputInitial = tf.newTuple();
- outputInitial.append(bag);
- // Tuple outputIntermed = intermed.exec(outputInitial);
- po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
- funcSpec);
- po.setAlgebraicFunction(INTERMED);
- po.attachInput(outputInitial);
- res = po.getNext(t);
- Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
- : null;
-
- sum = (Double) outputIntermed.get(0);
- count = (Long) outputIntermed.get(1);
- assertEquals(110.0, sum);
- assertEquals(20, count);
- System.out.println(outputIntermed);
- po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
- funcSpec);
- po.setAlgebraicFunction(FINAL);
- po.attachInput(outputInitial);
- res = po.getNext(t);
- Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result
- : null;
- // Double output = fin.exec(outputInitial);
- assertEquals(5.5, output);
- // System.out.println("output = " + output);
+ @Override
+ public Integer exec(Tuple input) throws IOException {
+ return new Integer(input.size());
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ // TODO FIX
+ // return new AtomSchema("arity");
+ return null;
+ }
+ }
+
+ public static class WeirdComparator extends ComparisonFunc {
+
+ @Override
+ public int compare(Tuple t1, Tuple t2) {
+ // TODO Auto-generated method stub
+ Object o1 = null;
+ Object o2 = null;
+ try {
+ o1 = t1.get(2);
+ o2 = t2.get(2);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ int i1 = (Integer) o1 - 2;
+ int i2 = (Integer) o2 - 2;
+
+ return (int) (i1 * i1 - i2 * i2);
+ }
+
+ }
+
+ /**
+ * Generates the average of the values of the first field of a tuple. This
+ * class is Algebraic in implemenation, so if possible the execution will be
+ * split into a local and global application
+ */
+ public static class AVG extends EvalFunc<Double> implements Algebraic {
+
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ double sum = 0;
+ double count = 0;
+
+ try {
+ sum = sum(input);
+ count = count(input);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+
+ double avg = 0;
+ if (count > 0)
+ avg = sum / count;
+
+ return new Double(avg);
+ }
+
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ public String getIntermed() {
+ return Intermed.class.getName();
+ }
+
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ Tuple t = mTupleFactory.newTuple(2);
+ t.set(0, new Double(sum(input)));
+ t.set(1, new Long(count(input)));
+ return t;
+ } catch (ExecException t) {
+ throw new RuntimeException(t.getMessage() + ": " + input);
+ }
+ }
+ }
+
+ static public class Intermed extends EvalFunc<Tuple> {
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ DataBag b = null;
+ Tuple t = null;
+ try {
+ b = (DataBag) input.get(0);
+ t = combine(b);
+ } catch (ExecException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ return t;
+ }
+ }
+
+ static public class Final extends EvalFunc<Double> {
+ @Override
+ public Double exec(Tuple input) throws IOException {
+ double sum = 0;
+ double count = 0;
+ try {
+ DataBag b = (DataBag) input.get(0);
+ Tuple combined = combine(b);
+
+ sum = (Double) combined.get(0);
+ count = (Long) combined.get(1);
+ } catch (ExecException e) {
+ e.printStackTrace();
+ }
+
+ double avg = 0;
+ if (count > 0) {
+ avg = sum / count;
+ }
+ return new Double(avg);
+ }
+ }
+
+ static protected Tuple combine(DataBag values) throws ExecException {
+ double sum = 0;
+ long count = 0;
+
+ Tuple output = mTupleFactory.newTuple(2);
+
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ sum += (Double) t.get(0);
+ count += (Long) t.get(1);
+ }
+
+ output.set(0, new Double(sum));
+ output.set(1, new Long(count));
+ return output;
+ }
+
+ static protected long count(Tuple input) throws ExecException {
+ DataBag values = (DataBag) input.get(0);
+ return values.size();
+ }
+
+ static protected double sum(Tuple input) throws ExecException {
+ DataBag values = (DataBag) input.get(0);
+
+ double sum = 0;
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ Double d = DataType.toDouble(t.get(0));
+ if (d == null)
+ continue;
+ sum += d;
+ }
+
+ return sum;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ // TODO FIX
+ // return new AtomSchema("average");
+ return null;
+ }
+
+ }
+
+ @Test
+ public void testUserFuncArity() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r,
+ MAX_TUPLES, 100);
+ String funcSpec = ARITY.class.getName() + "()";
+ PORead read = new PORead(new OperatorKey("", r.nextLong()), input);
+ List<PhysicalOperator> inputs = new LinkedList<PhysicalOperator>();
+ inputs.add(read);
+ POUserFunc userFunc = new POUserFunc(new OperatorKey("", r.nextLong()),
+ -1, inputs, funcSpec);
+ Result res = new Result();
+ Integer i = null;
+ res = userFunc.getNext(i);
+ while (res.returnStatus != POStatus.STATUS_EOP) {
+ // System.out.println(res.result);
+ int result = (Integer) res.result;
+ assertEquals(2, result);
+ res = userFunc.getNext(i);
+ }
+ }
+
+ @Test
+ public void testUDFCompare() throws ExecException {
+ DataBag input = (DataBag) GenRandomData.genRandSmallTupDataBag(r, 2,
+ 100);
+ String funcSpec = WeirdComparator.class.getName() + "()";
+ POUserFunc userFunc = new POUserComparisonFunc(new OperatorKey("", r.nextLong()),
+ -1, null, funcSpec);
+ Iterator<Tuple> it = input.iterator();
+ Tuple t1 = it.next();
+ Tuple t2 = it.next();
+ t1.append(2);
+ t2.append(3);
+ ((POUserComparisonFunc)userFunc).attachInput(t1, t2);
+ Integer i = null;
+ // System.out.println(t1 + " " + t2);
+ int result = (Integer) (userFunc.getNext(i).result);
+ assertEquals(-1, result);
+ }
+
+ @Test
+ public void testAlgebraicAVG() throws IOException, ExecException {
+ int input[] = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
+ byte INIT = 0;
+ byte INTERMED = 1;
+ byte FINAL = 2;
+ Tuple tup1 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+ input);
+ Tuple tup2 = Util.loadNestTuple(TupleFactory.getInstance().newTuple(1),
+ input);
+ // System.out.println("Input = " + tup1);
+ String funcSpec = AVG.class.getName() + "()";
+
+ POUserFunc po = new POUserFunc(new OperatorKey("", r.nextLong()), -1,
+ null, funcSpec);
+
+ TupleFactory tf = TupleFactory.getInstance();
+
+ po.setAlgebraicFunction(INIT);
+ po.attachInput(tup1);
+ Tuple t = null;
+ Result res = po.getNext(t);
+ Tuple outputInitial1 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+ : null;
+ Tuple outputInitial2 = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+ : null;
+ System.out.println(outputInitial1 + " " + outputInitial2);
+ assertEquals(outputInitial1, outputInitial2);
+ double sum = (Double) outputInitial1.get(0);
+ long count = (Long) outputInitial1.get(1);
+ assertEquals(55.0, sum);
+ assertEquals(10, count);
+ DataBag bag = BagFactory.getInstance().newDefaultBag();
+ bag.add(outputInitial1);
+ bag.add(outputInitial2);
+ Tuple outputInitial = tf.newTuple();
+ outputInitial.append(bag);
+ // Tuple outputIntermed = intermed.exec(outputInitial);
+ po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+ funcSpec);
+ po.setAlgebraicFunction(INTERMED);
+ po.attachInput(outputInitial);
+ res = po.getNext(t);
+ Tuple outputIntermed = (res.returnStatus == POStatus.STATUS_OK) ? (Tuple) res.result
+ : null;
+
+ sum = (Double) outputIntermed.get(0);
+ count = (Long) outputIntermed.get(1);
+ assertEquals(110.0, sum);
+ assertEquals(20, count);
+ System.out.println(outputIntermed);
+ po = new POUserFunc(new OperatorKey("", r.nextLong()), -1, null,
+ funcSpec);
+ po.setAlgebraicFunction(FINAL);
+ po.attachInput(outputInitial);
+ res = po.getNext(t);
+ Double output = (res.returnStatus == POStatus.STATUS_OK) ? (Double) res.result
+ : null;
+ // Double output = fin.exec(outputInitial);
+ assertEquals(5.5, output);
+ // System.out.println("output = " + output);
- }
+ }
}