You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2013/07/19 23:59:57 UTC

svn commit: r1505044 [2/2] - in /pig/trunk: contrib/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/ src/org/apache/pig/data/

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java?rev=1505044&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java Fri Jul 19 21:59:56 2013
@@ -0,0 +1,1629 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.evaluation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestOver {
+
+    @Test
+    public void testSchema() throws Exception {
+        // No type
+        Over func = new Over();
+        Schema in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+        Schema out = func.outputSchema(in);
+        assertEquals("{{NULL}}", out.toString());
+
+        // chararray
+        func = new Over("chararray");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+        out = func.outputSchema(in);
+        assertEquals("{{chararray}}", out.toString());
+
+        // int
+        func = new Over("Int");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+        out = func.outputSchema(in);
+        assertEquals("{{int}}", out.toString());
+
+        // double
+        func = new Over("DOUBLE");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+        out = func.outputSchema(in);
+        assertEquals("{{double}}", out.toString());
+    }
+
+    @Test
+    public void testBadInput() throws Exception {
+        Over func = new Over();
+        boolean caught = false;
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append("Mary had a little lamb");
+        t.append("count");
+        t.append(0);
+        t.append(0);
+        try {
+            func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+            assertEquals("Over expected a bag for arg 1 but received chararray",
+                    ee.getMessage());
+        }
+        assertTrue(caught);
+
+        func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1);
+            inbag.add(t);
+        }
+        t = TupleFactory.getInstance().newTuple();
+        t.append(inbag);
+        caught = false;
+        try {
+            func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+            assertEquals("Over expected 2 or more inputs but received 1",
+                    ee.getMessage());
+        }
+        assertTrue(caught);
+
+        func = new Over();
+        t.append(1);
+        caught = false;
+        try {
+            func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+            assertEquals("Over expected a string for arg 2 but received int",
+                    ee.getMessage());
+        }
+        assertTrue(caught);
+
+        func = new Over();
+        t.set(1, "count");
+        t.append("fred");
+        caught = false;
+        try {
+            func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+            assertEquals("Over expected an integer for arg 3 but received chararray",
+                    ee.getMessage());
+        }
+        assertTrue(caught);
+
+        func = new Over();
+        t.set(2, -1);
+        t.append("fred");
+        caught = false;
+        try {
+            func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+            assertEquals("Over expected an integer for arg 4 but received chararray",
+                    ee.getMessage());
+        }
+        assertTrue(caught);
+    }
+    
+    @Test
+    public void testBagFunc() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "fred");
+        boolean caught = false;
+        try {
+            DataBag outbag = func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+            assertEquals("Unknown aggregate fred", ee.getMessage());
+        }
+        assertTrue(caught);
+    }
+
+
+    @Test
+    public void testCountNoWindow() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "count");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Long(10), to.get(0));
+        }
+    }
+
+    @Test
+    public void testCountPrecedingUnboundedToCurrent() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "count");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int cnt = 1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Long(cnt++), to.get(0));
+        }
+    }
+    
+    @Test
+    public void testCountCurrentToUnboundedFollowing() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "count");
+        t.set(2, 0);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int cnt = 10;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Long(cnt--), to.get(0));
+        }
+    }
+
+    @Test
+    public void testThreeBeforeAndAfter() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(int)");
+        t.set(2, 3);
+        t.set(3, 3);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int sum = 1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            switch (sum++) {
+                case 1:
+                case 10:
+                    assertEquals(new Long(4), to.get(0));
+                    break;
+
+                case 2:
+                case 9:
+                    assertEquals(new Long(5), to.get(0));
+                    break;
+
+                case 3:
+                case 8:
+                    assertEquals(new Long(6), to.get(0));
+                    break;
+
+                case 4:
+                case 5:
+                case 6:
+                case 7:
+                    assertEquals(new Long(7), to.get(0));
+                    break;
+
+                default:
+                    // Huh?
+                    throw new RuntimeException("We shouldn't be here, sum is "
+                            + sum);
+            }
+        }
+    }
+
+    @Test
+    public void testSumDouble() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1.0);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(double)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(10.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testSumByteArray() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new DataByteArray("1"));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(bytearray)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(10.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testSumFloat() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1.0f);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(float)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(10.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testSumInt() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(int)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Long(10), to.get(0));
+        }
+    }
+
+    @Test
+    public void testSumLong() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, 1L);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(long)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Long(10), to.get(0));
+        }
+    }
+
+    @Test
+    public void testAvgDouble() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (double)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "avg(double)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(4.5), to.get(0));
+        }
+    }
+
+    @Test
+    public void testAvgByteArray() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new DataByteArray("1"));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "avg(bytearray)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(1.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testAvgFloat() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (float)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "avg(float)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(4.5), to.get(0));
+        }
+    }
+
+    @Test
+    public void testAvgInt() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "avg(int)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(4.5), to.get(0));
+        }
+    }
+
+    @Test
+    public void testAvgLong() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (long)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "avg(long)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(4.5), to.get(0));
+        }
+    }
+    
+    @Test
+    public void testMinDouble() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (double)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(double)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(0.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMinByteArray() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new DataByteArray(new Integer(i).toString()));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(bytearray)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(0.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMinFloat() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (float)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(float)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Float(0.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMinInt() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(int)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Integer(0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMinLong() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (long)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(long)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Long(0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMinString() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new Integer(i).toString());
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(chararray)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals("0", to.get(0));
+        }
+    }
+
+    @Test
+    public void testMaxDouble() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (double)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "max(double)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(count++), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMaxByteArray() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new DataByteArray(new Integer(i).toString()));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "max(bytearray)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(count++), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMaxFloat() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (float)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "max(float)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Float(count++), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMaxInt() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "max(int)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Integer(count++), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMaxLong() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (long)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "max(long)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Long(count++), to.get(0));
+        }
+    }
+
+    @Test
+    public void testMaxString() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new Integer(i).toString());
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "max(chararray)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals("9", to.get(0));
+        }
+    }
+
+    @Test
+    public void testRowNumber() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (double)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "row_number");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Integer(count++), to.get(0));
+        }
+    }
+
+    @Test
+    public void testFirstValue() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, (double)i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "first_value");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Double(0.0), to.get(0));
+        }
+    }
+
+    @Test
+    public void testLastValue() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "last_value");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new Integer(count++), to.get(0));
+        }
+    }
+
+    @Test
+    public void testLeadDefaults() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "lead");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            if (count < 10) assertEquals(new Integer(count++), to.get(0));
+            else assertNull(to.get(0));
+        }
+    }
+
+    @Test
+    public void testLeadWithRowsAheadNoDefault() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "lead");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 3);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 3;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            if (count < 10) assertEquals(new Integer(count++), to.get(0));
+            else assertNull(to.get(0));
+        }
+    }
+
+    @Test
+    public void testLeadWithRowsAheadDefault() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(6);
+        t.set(0, inbag);
+        t.set(1, "lead");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 3);
+        t.set(5, 99);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 3;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            if (count < 10) assertEquals(new Integer(count++), to.get(0));
+            else assertEquals(new Integer(99), to.get(0));
+        }
+    }
+
+    @Test
+    public void testLagDefaults() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "lag");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = -1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            try {
+                if (count >= 0) assertEquals(new Integer(count), to.get(0));
+                else assertNull(to.get(0));
+            } finally {
+                count++;
+            }
+        }
+    }
+
+    @Test
+    public void testLagWithRowsBehindNoDefault() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "lag");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 3);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = -3;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            try {
+                if (count >= 0) assertEquals(new Integer(count), to.get(0));
+                else assertNull(to.get(0));
+            } finally {
+                count++;
+            }
+        }
+    }
+
+    @Test
+    public void testLagWithRowsBehindDefault() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(6);
+        t.set(0, inbag);
+        t.set(1, "lag");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 3);
+        t.set(5, 99);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = -3;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            try {
+                if (count >= 0) assertEquals(new Integer(count), to.get(0));
+                else assertEquals(new Integer(99), to.get(0));
+            } finally {
+                count++;
+            }
+        }
+    }
+
+    @Test
+    public void testRankNoArgs() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "rank");
+        boolean caught = false;
+        try {
+            DataBag outbag = func.exec(t);
+        } catch (ExecException ioe) {
+            caught = true;
+            assertTrue(ioe.getMessage().contains("Rank args must contain"));
+        }
+        assertTrue(caught);
+    }
+
+    @Test
+    public void testRankBadArgs() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, "fred");
+        boolean caught = false;
+        try {
+            DataBag outbag = func.exec(t);
+        } catch (ExecException ioe) {
+            caught = true;
+            assertTrue(ioe.getMessage().contains("Rank expected column number"));
+        }
+        assertTrue(caught);
+    }
+
+    @Test
+    public void testRankSimple() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(2);
+            t.set(0, i);
+            t.set(1, r.nextInt(100));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(count++, to.get(0));
+        }
+    }
+
+    @Test
+    public void testRankWithRepeatValues() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 2);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 7);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+
+        t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(7, outbag.size());
+        Iterator<Tuple> iter = outbag.iterator();
+        t = iter.next();
+        assertEquals(1, t.get(0));
+        t = iter.next();
+        assertEquals(1, t.get(0));
+        t = iter.next();
+        assertEquals(3, t.get(0));
+        t = iter.next();
+        assertEquals(4, t.get(0));
+        t = iter.next();
+        assertEquals(4, t.get(0));
+        t = iter.next();
+        assertEquals(4, t.get(0));
+        t = iter.next();
+        assertEquals(7, t.get(0));
+    }
+
+   @Test
+    public void testRankWithMultiKey() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        Tuple t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        t.set(2, "a");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        t.set(2, "b");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 2);
+        t.set(1, r.nextInt(100));
+        t.set(2, "b");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t.set(2, "b");
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        t.set(2, "c");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        t.set(2, "c");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 7);
+        t.set(1, r.nextInt(100));
+        t.set(2, "z");
+        inbag.add(t);
+
+        t = TupleFactory.getInstance().newTuple(6);
+        t.set(0, inbag);
+        t.set(1, "rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        t.set(5, 2);
+        DataBag outbag = func.exec(t);
+        assertEquals(7, outbag.size());
+        Iterator<Tuple> iter = outbag.iterator();
+        t = iter.next();
+        assertEquals(1, t.get(0));
+        t = iter.next();
+        assertEquals(2, t.get(0));
+        t = iter.next();
+        assertEquals(3, t.get(0));
+        t = iter.next();
+        assertEquals(4, t.get(0));
+        t = iter.next();
+        assertEquals(5, t.get(0));
+        t = iter.next();
+        assertEquals(5, t.get(0));
+        t = iter.next();
+        assertEquals(7, t.get(0));
+    }
+
+    @Test
+    public void testDenseRankSimple() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(2);
+            t.set(0, i);
+            t.set(1, r.nextInt(100));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "dense_rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(count++, to.get(0));
+        }
+    }
+
+    @Test
+    public void testDenseRankWithRepeatValues() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 2);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 7);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+
+        t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "dense_rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(7, outbag.size());
+        Iterator<Tuple> iter = outbag.iterator();
+        t = iter.next();
+        assertEquals(1, t.get(0));
+        t = iter.next();
+        assertEquals(1, t.get(0));
+        t = iter.next();
+        assertEquals(2, t.get(0));
+        t = iter.next();
+        assertEquals(3, t.get(0));
+        t = iter.next();
+        assertEquals(3, t.get(0));
+        t = iter.next();
+        assertEquals(3, t.get(0));
+        t = iter.next();
+        assertEquals(4, t.get(0));
+    }
+
+   @Test
+    public void testDenseRankWithMultiKey() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        Tuple t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        t.set(2, "a");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        t.set(2, "b");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 2);
+        t.set(1, r.nextInt(100));
+        t.set(2, "b");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t.set(2, "b");
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        t.set(2, "c");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        t.set(2, "c");
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(3);
+        t.set(0, 7);
+        t.set(1, r.nextInt(100));
+        t.set(2, "z");
+        inbag.add(t);
+
+        t = TupleFactory.getInstance().newTuple(6);
+        t.set(0, inbag);
+        t.set(1, "dense_rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        t.set(5, 2);
+        DataBag outbag = func.exec(t);
+        assertEquals(7, outbag.size());
+        Iterator<Tuple> iter = outbag.iterator();
+        t = iter.next();
+        assertEquals(1, t.get(0));
+        t = iter.next();
+        assertEquals(2, t.get(0));
+        t = iter.next();
+        assertEquals(3, t.get(0));
+        t = iter.next();
+        assertEquals(4, t.get(0));
+        t = iter.next();
+        assertEquals(5, t.get(0));
+        t = iter.next();
+        assertEquals(5, t.get(0));
+        t = iter.next();
+        assertEquals(6, t.get(0));
+    }
+
+    @Test
+    public void testNtileNoArgs() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "ntile");
+        boolean caught = false;
+        try {
+            DataBag outbag = func.exec(t);
+        } catch (ExecException ioe) {
+            caught = true;
+            assertTrue(ioe.getMessage().contains("Ntile args must contain"));
+        }
+        assertTrue(caught);
+    }
+
+    @Test
+    public void testNtileBadArgs() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, i);
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "ntile");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, "fred");
+        boolean caught = false;
+        try {
+            DataBag outbag = func.exec(t);
+        } catch (ExecException ioe) {
+            caught = true;
+            assertTrue(ioe.getMessage().contains("Ntile expected integer"));
+        }
+        assertTrue(caught);
+    }
+
+    @Test
+    public void testNtileFour() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(2);
+            t.set(0, i);
+            t.set(1, r.nextInt(100));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "ntile");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 4);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            if (count < 3) assertEquals(1, to.get(0));
+            else if (count < 5) assertEquals(2, to.get(0));
+            else if (count < 8) assertEquals(3, to.get(0));
+            else assertEquals(4, to.get(0));
+            count++;
+        }
+    }
+
+    @Test
+    public void testNtileTen() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(2);
+            t.set(0, i);
+            t.set(1, r.nextInt(100));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "ntile");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 10);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 1;
+        for (Tuple to : outbag) {
+            assertEquals(count, to.get(0));
+            count++;
+        }
+    }
+
+    @Test
+    public void testNtileHundred() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(2);
+            t.set(0, i);
+            t.set(1, r.nextInt(100));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "ntile");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 100);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 1;
+        for (Tuple to : outbag) {
+            assertEquals(count, to.get(0));
+            count++;
+        }
+    }
+
+    @Test
+    public void testPercentRankSimple() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(2);
+            t.set(0, i);
+            t.set(1, r.nextInt(100));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "percent_rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(count/9.0, to.get(0));
+            count++;
+        }
+    }
+
+    @Test
+    public void testPercentRankWithRepeatValues() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 2);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 7);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+
+        t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "percent_rank");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(7, outbag.size());
+        Iterator<Tuple> iter = outbag.iterator();
+        t = iter.next();
+        assertEquals(0.0, t.get(0));
+        t = iter.next();
+        assertEquals(0.0, t.get(0));
+        t = iter.next();
+        assertEquals(0.3333333333333333, t.get(0));
+        t = iter.next();
+        assertEquals(0.5, t.get(0));
+        t = iter.next();
+        assertEquals(0.5, t.get(0));
+        t = iter.next();
+        assertEquals(0.5, t.get(0));
+        t = iter.next();
+        assertEquals(1.0, t.get(0));
+    }
+
+    @Test
+    public void testCumeDistSimple() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(2);
+            t.set(0, i);
+            t.set(1, r.nextInt(100));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "cume_dist");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 1;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(count/10.0, to.get(0));
+            count++;
+        }
+    }
+
+    @Test
+    public void testCumeDistWithRepeatValues() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        Random r = new Random();
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, null);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 2);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 5);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+        t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, 7);
+        t.set(1, r.nextInt(100));
+        inbag.add(t);
+
+        t = TupleFactory.getInstance().newTuple(5);
+        t.set(0, inbag);
+        t.set(1, "cume_dist");
+        t.set(2, -1);
+        t.set(3, -1);
+        t.set(4, 0);
+        DataBag outbag = func.exec(t);
+        assertEquals(7, outbag.size());
+        Iterator<Tuple> iter = outbag.iterator();
+        t = iter.next();
+        assertEquals(0.14285714285714285, t.get(0));
+        t = iter.next();
+        assertEquals(0.14285714285714285, t.get(0));
+        t = iter.next();
+        assertEquals(0.42857142857142855, t.get(0));
+        t = iter.next();
+        assertEquals(0.5714285714285714, t.get(0));
+        t = iter.next();
+        assertEquals(0.5714285714285714, t.get(0));
+        t = iter.next();
+        assertEquals(0.5714285714285714, t.get(0));
+        t = iter.next();
+        assertEquals(1.0, t.get(0));
+    }
+}

Added: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestStitch.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestStitch.java?rev=1505044&view=auto
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestStitch.java (added)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestStitch.java Fri Jul 19 21:59:56 2013
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.evaluation;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestStitch {
+
+    @Test
+    public void testSchema() throws Exception {
+        Schema s = new Schema();
+        Schema in = new Schema();
+        s.add(new FieldSchema("x", DataType.CHARARRAY));
+        s.add(new FieldSchema("y", DataType.INTEGER));
+        in.add(new FieldSchema("A", s, DataType.BAG));
+        FieldSchema fs = new FieldSchema("Over",
+                new Schema(Schema.generateNestedSchema(DataType.BAG,
+                        DataType.NULL)), DataType.BAG);
+        in.add(fs);
+        Stitch func = new Stitch();
+        Schema out = func.outputSchema(in);
+        assertEquals("{stitched: {x: chararray,y: int,{NULL}}}", out.toString());
+    }
+
+    @Test
+    public void testSchema2() throws Exception {
+        Schema t = new Schema();
+        Schema b = new Schema();
+        t.add(new FieldSchema("x", DataType.CHARARRAY));
+        t.add(new FieldSchema("y", DataType.INTEGER));
+        b.add(new FieldSchema("A", t, DataType.TUPLE));
+        Schema in = new Schema(new FieldSchema("", b, DataType.BAG));
+        FieldSchema fs = new FieldSchema("Over",
+                new Schema(Schema.generateNestedSchema(DataType.BAG,
+                        DataType.NULL)), DataType.BAG);
+        in.add(fs);
+        Stitch func = new Stitch();
+        Schema out = func.outputSchema(in);
+        assertEquals("{stitched: {x: chararray,y: int,{NULL}}}", out.toString());
+    }
+
+
+    @Test
+    public void testNoInput() throws Exception {
+        Stitch func = new Stitch();
+        assertNull(func.exec(null));
+        Tuple t = TupleFactory.getInstance().newTuple();
+        assertNull(func.exec(t));
+    }
+
+    @Test
+    public void testBadInput() throws Exception {
+        Stitch func = new Stitch();
+        boolean caught = false;
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append("Mary had a little lamb");
+        try {
+            func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+        }
+        assertTrue(caught);
+
+        DataBag b = BagFactory.getInstance().newDefaultBag();
+        b.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append(b);
+        t.append("its fleece was white as snow");
+        caught = false;
+         try {
+            func.exec(t);
+        } catch (ExecException ee) {
+            caught = true;
+        }
+        assertTrue(caught);
+    }
+
+    @Test
+    public void testSingleInput() throws Exception {
+        Stitch func = new Stitch();
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append(new Integer(1));
+        DataBag b = BagFactory.getInstance().newDefaultBag();
+        b.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append(b);
+        DataBag out = func.exec(t);
+        assertEquals(1, out.size());
+        assertEquals(new Integer(1), out.iterator().next().get(0));
+    }
+    
+    @Test
+    public void testDoubleInput() throws Exception {
+        Stitch func = new Stitch();
+        DataBag b1 = BagFactory.getInstance().newDefaultBag();
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append("a");
+        t.append("b");
+        b1.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append("c");
+        t.append("d");
+        b1.add(t);
+        
+        DataBag b2 = BagFactory.getInstance().newDefaultBag();
+        t = TupleFactory.getInstance().newTuple();
+        t.append("1");
+        t.append("2");
+        b2.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append("3");
+        t.append("4");
+        b2.add(t);
+
+        t = TupleFactory.getInstance().newTuple();
+        t.append(b1);
+        t.append(b2);
+        DataBag out = func.exec(t);
+        assertEquals(2, out.size());
+        Iterator<Tuple> iter = out.iterator();
+        t = iter.next();
+        assertEquals(4, t.size());
+        assertEquals("a", t.get(0));
+        assertEquals("b", t.get(1));
+        assertEquals("1", t.get(2));
+        assertEquals("2", t.get(3));
+        t = iter.next();
+        assertEquals(4, t.size());
+        assertEquals("c", t.get(0));
+        assertEquals("d", t.get(1));
+        assertEquals("3", t.get(2));
+        assertEquals("4", t.get(3));
+    }
+
+    @Test
+    public void testSecondShort() throws Exception {
+        Stitch func = new Stitch();
+        DataBag b1 = BagFactory.getInstance().newDefaultBag();
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append("a");
+        t.append("b");
+        b1.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append("c");
+        t.append("d");
+        b1.add(t);
+        
+        DataBag b2 = BagFactory.getInstance().newDefaultBag();
+        t = TupleFactory.getInstance().newTuple();
+        t.append("1");
+        t.append("2");
+        b2.add(t);
+
+        t = TupleFactory.getInstance().newTuple();
+        t.append(b1);
+        t.append(b2);
+        DataBag out = func.exec(t);
+        assertEquals(2, out.size());
+        Iterator<Tuple> iter = out.iterator();
+        t = iter.next();
+        assertEquals(4, t.size());
+        assertEquals("a", t.get(0));
+        assertEquals("b", t.get(1));
+        assertEquals("1", t.get(2));
+        assertEquals("2", t.get(3));
+        t = iter.next();
+        assertEquals(2, t.size());
+        assertEquals("c", t.get(0));
+        assertEquals("d", t.get(1));
+    }
+
+    @Test
+    public void testFirstShort() throws Exception {
+        Stitch func = new Stitch();
+        DataBag b1 = BagFactory.getInstance().newDefaultBag();
+        Tuple t = TupleFactory.getInstance().newTuple();
+        t.append("a");
+        t.append("b");
+        b1.add(t);
+        
+        DataBag b2 = BagFactory.getInstance().newDefaultBag();
+        t = TupleFactory.getInstance().newTuple();
+        t.append("1");
+        t.append("2");
+        b2.add(t);
+        t = TupleFactory.getInstance().newTuple();
+        t.append("3");
+        t.append("4");
+        b2.add(t);
+
+        t = TupleFactory.getInstance().newTuple();
+        t.append(b1);
+        t.append(b2);
+        DataBag out = func.exec(t);
+        assertEquals(1, out.size());
+        Iterator<Tuple> iter = out.iterator();
+        t = iter.next();
+        assertEquals(4, t.size());
+        assertEquals("a", t.get(0));
+        assertEquals("b", t.get(1));
+        assertEquals("1", t.get(2));
+        assertEquals("2", t.get(3));
+    }
+
+          
+}

Modified: pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/DataType.java?rev=1505044&r1=1505043&r2=1505044&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ pig/trunk/src/org/apache/pig/data/DataType.java Fri Jul 19 21:59:56 2013
@@ -348,6 +348,34 @@ public class DataType {
     }
 
     /**
+     * Get the type code from the type name 
+     * @param name Type name
+     * @return type code
+     */
+    public static byte findTypeByName(String name) {
+        if (name == null) return NULL;
+        else if ("boolean".equalsIgnoreCase(name)) return BOOLEAN;
+        else if ("byte".equalsIgnoreCase(name)) return BYTE;
+        else if ("int".equalsIgnoreCase(name)) return INTEGER;
+        else if ("biginteger".equalsIgnoreCase(name)) return BIGINTEGER;
+        else if ("bigdecimal".equalsIgnoreCase(name)) return BIGDECIMAL;
+        else if ("long".equalsIgnoreCase(name)) return LONG;
+        else if ("float".equalsIgnoreCase(name)) return FLOAT;
+        else if ("double".equalsIgnoreCase(name)) return DOUBLE;
+        else if ("datetime".equalsIgnoreCase(name)) return DATETIME;
+        else if ("bytearray".equalsIgnoreCase(name)) return BYTEARRAY;
+        else if ("bigchararray".equalsIgnoreCase(name)) return BIGCHARARRAY;
+        else if ("chararray".equalsIgnoreCase(name)) return CHARARRAY;
+        else if ("map".equalsIgnoreCase(name)) return MAP;
+        else if ("internalmap".equalsIgnoreCase(name)) return INTERNALMAP;
+        else if ("tuple".equalsIgnoreCase(name)) return TUPLE;
+        else if ("bag".equalsIgnoreCase(name)) return BAG;
+        else if ("generic_writablecomparable".equalsIgnoreCase(name)) return GENERIC_WRITABLECOMPARABLE;
+        else return UNKNOWN;
+    }
+
+
+    /**
      * Determine whether the this data type is complex.
      * @param dataType Data type code to test.
      * @return true if dataType is bag, tuple, or map.