You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2016/06/23 18:15:35 UTC

svn commit: r1749956 - in /pig/trunk: CHANGES.txt contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java

Author: daijy
Date: Thu Jun 23 18:15:35 2016
New Revision: 1749956

URL: http://svn.apache.org/viewvc?rev=1749956&view=rev
Log:
PIG-4906: Add Bigdecimal functions in Over function

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
    pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1749956&r1=1749955&r2=1749956&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jun 23 18:15:35 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4906: Add Bigdecimal functions in Over function (cgalan via daijy)
+
 PIG-2768: Fix org.apache.hadoop.conf.Configuration deprecation warnings for Hadoop 23 (rohini)
  
 OPTIMIZATIONS

Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java?rev=1749956&r1=1749955&r2=1749956&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java Thu Jun 23 18:15:35 2016
@@ -23,10 +23,13 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.BigDecimalAvg;
+import org.apache.pig.builtin.BigDecimalMax;
+import org.apache.pig.builtin.BigDecimalMin;
+import org.apache.pig.builtin.BigDecimalSum;
 import org.apache.pig.builtin.COUNT;
 import org.apache.pig.builtin.DoubleAvg;
 import org.apache.pig.builtin.DoubleMax;
@@ -54,6 +57,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 
 /**
  * Given an aggregate function, a bag, and possibly a window definition,
@@ -73,23 +77,27 @@ import org.apache.pig.impl.logicalLayer.
  *           <li>sum(int)</li>
  *           <li>sum(long)</li>
  *           <li>sum(bytearray)</li>
+ *           <li>sum(bigdecimal)</li>
  *           <li>avg(double)</li>
  *           <li>avg(float)</li>
  *           <li>avg(long)</li>
  *           <li>avg(int)</li>
  *           <li>avg(bytearray)</li>
+ *           <li>avg(bigdecimal)</li>
  *           <li>min(double)</li>
  *           <li>min(float)</li>
  *           <li>min(long)</li>
  *           <li>min(int)</li>
  *           <li>min(chararray)</li>
  *           <li>min(bytearray)</li>
+ *           <li>min(bigdecimal)</li>
  *           <li>max(double)</li>
  *           <li>max(float)</li>
  *           <li>max(long)</li>
  *           <li>max(int)</li>
  *           <li>max(chararray)</li>
  *           <li>max(bytearray)</li>
+ *           <li>max(bigdecimal)</li>
  *           <li>row_number</li>
  *           <li>first_value</li>
  *           <li>last_value</li>
@@ -153,7 +161,8 @@ import org.apache.pig.impl.logicalLayer.
  * current row and 3 following) over T;</tt>
  *
  * <p>Over accepts a constructor argument specifying the name and type,
- * colon-separated, of its return schema.</p>
+ * colon-separated, of its return schema. If the argument option is 'true' use the inner-search,
+ * take the name and type of bag and return a schema with alias+'_over' and the same type</p>
  *
  * <p><pre>
  * DEFINE IOver org.apache.pig.piggybank.evaluation.Over('state_rk:int');
@@ -188,12 +197,14 @@ public class Over extends EvalFunc<DataB
     private Object[] udfArgs;
     private byte   returnType;
     private String returnName;
+    private boolean searchInnerType;
 
     public Over() {
         initialized = false;
         udfArgs = null;
         func = null;
         returnType = DataType.UNKNOWN;
+        searchInnerType = false;
     }
 
     public Over(String typespec) {
@@ -202,12 +213,16 @@ public class Over extends EvalFunc<DataB
             String[] fn_tn = typespec.split(":", 2);
             this.returnName = fn_tn[0];
             this.returnType = DataType.findTypeByName(fn_tn[1]);
-        } else {
+        } else if(Boolean.parseBoolean(typespec)) {
+            searchInnerType = Boolean.parseBoolean(typespec);
+        }else{
             this.returnName = "result";
             this.returnType = DataType.findTypeByName(typespec);
-        }
+        }       
     }
 
+
+
     @Override
     public DataBag exec(Tuple input) throws IOException {
         if (input == null || input.size() < 2) {
@@ -255,19 +270,42 @@ public class Over extends EvalFunc<DataB
     @Override
     public Schema outputSchema(Schema inputSch) {
         try {
-            if (returnType == DataType.UNKNOWN) {
+            FieldSchema field;
+
+            if (searchInnerType) {
+                field = new FieldSchema(inputSch.getField(0));
+                while (searchInnerType) {
+                    if (field.schema != null
+                            && field.schema.getFields().size() > 1) {
+                        searchInnerType = false;
+                    } else {
+                        if (field.type == DataType.TUPLE
+                                || field.type == DataType.BAG) {
+                            field = new FieldSchema(field.schema.getField(0));
+                        } else {
+                            field.alias = field.alias + "_over";
+                            searchInnerType = false;
+                        }
+                    }
+                }
+
+                searchInnerType = true;
+            } else if (returnType == DataType.UNKNOWN) {
                 return Schema.generateNestedSchema(DataType.BAG, DataType.NULL);
             } else {
-                Schema outputTupleSchema = new Schema(new Schema.FieldSchema(returnName, returnType));
-                return new Schema(new Schema.FieldSchema(
-                        getSchemaName(this.getClass().getName().toLowerCase(), inputSch),
-                            outputTupleSchema, 
-                            DataType.BAG));
+                field = new Schema.FieldSchema(returnName, returnType);
             }
+
+            Schema outputTupleSchema = new Schema(field);
+            return new Schema(new Schema.FieldSchema(getSchemaName(this
+                    .getClass().getName().toLowerCase(), inputSch),
+                    outputTupleSchema, DataType.BAG));
+
         } catch (FrontendException fe) {
             throw new RuntimeException("Unable to create nested schema", fe);
         }
     }
+    
 
     private void init(Tuple input) throws IOException {
         initialized = true;
@@ -329,6 +367,8 @@ public class Over extends EvalFunc<DataB
             func = new LongSum();
         } else if ("sum(bytearray)".equalsIgnoreCase(agg)) {
             func = new SUM();
+        } else if ("sum(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalSum();
         } else if ("avg(double)".equalsIgnoreCase(agg)) {
             func = new DoubleAvg();
         } else if ("avg(float)".equalsIgnoreCase(agg)) {
@@ -339,6 +379,8 @@ public class Over extends EvalFunc<DataB
             func = new IntAvg();
         } else if ("avg(bytearray)".equalsIgnoreCase(agg)) {
             func = new AVG();
+        } else if ("avg(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalAvg();
         } else if ("min(double)".equalsIgnoreCase(agg)) {
             func = new DoubleMin();
         } else if ("min(float)".equalsIgnoreCase(agg)) {
@@ -351,6 +393,8 @@ public class Over extends EvalFunc<DataB
             func = new StringMin();
         } else if ("min(bytearray)".equalsIgnoreCase(agg)) {
             func = new MIN();
+        } else if ("min(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalMin();
         } else if ("max(double)".equalsIgnoreCase(agg)) {
             func = new DoubleMax();
         } else if ("max(float)".equalsIgnoreCase(agg)) {
@@ -363,6 +407,8 @@ public class Over extends EvalFunc<DataB
             func = new StringMax();
         } else if ("max(bytearray)".equalsIgnoreCase(agg)) {
             func = new MAX();
+        } else if ("max(bigdecimal)".equalsIgnoreCase(agg)) {
+            func = new BigDecimalMax();
         } else if ("row_number".equalsIgnoreCase(agg)) {
             func = new RowNumber();
         } else if ("first_value".equalsIgnoreCase(agg)) {

Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java?rev=1749956&r1=1749955&r2=1749956&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java Thu Jun 23 18:15:35 2016
@@ -18,12 +18,11 @@
 package org.apache.pig.piggybank.evaluation;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.math.BigDecimal;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Random;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -34,8 +33,6 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import org.junit.Before;
 import org.junit.Test;
 
 public class TestOver {
@@ -66,11 +63,25 @@ public class TestOver {
         out = func.outputSchema(in);
         assertEquals("{org.apache.pig.piggybank.evaluation.over_3: {result: double}}", out.toString());
 
+        // bigdecimal
+        func = new Over("BIGDECIMAL");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+        out = func.outputSchema(in);
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {result: bigdecimal}}", out.toString());
+        
         // named 
         func = new Over("bob:chararray");
         in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
         out = func.outputSchema(in);
-        assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {bob: chararray}}", out.toString());
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_5: {bob: chararray}}", out.toString());
+        
+        
+        // Search inner alias and type
+        func = new Over("true");
+        in = Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL);
+        in.getField(0).schema.getField(0).alias="test";
+        out = func.outputSchema(in);
+        assertEquals("{org.apache.pig.piggybank.evaluation.over_6: {test_over: bigdecimal}}", out.toString());
     }
 
     @Test
@@ -397,6 +408,28 @@ public class TestOver {
             assertEquals(new Long(10), to.get(0));
         }
     }
+    
+    @Test
+    public void testSumBigDecimal() throws Exception {
+    	Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new BigDecimal(1));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "sum(bigdecimal)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(10), to.get(0));
+        }
+    }
 
     @Test
     public void testAvgDouble() throws Exception {
@@ -509,6 +542,29 @@ public class TestOver {
     }
     
     @Test
+    public void testAvgBigDecimal() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new BigDecimal(i));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(4);
+        t.set(0, inbag);
+        t.set(1, "avg(bigdecimal)");
+        t.set(2, -1);
+        t.set(3, -1);
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(4.5), to.get(0));
+        }
+    }
+    
+    
+    @Test
     public void testMinDouble() throws Exception {
         Over func = new Over();
         DataBag inbag = BagFactory.getInstance().newDefaultBag();
@@ -627,6 +683,26 @@ public class TestOver {
             assertEquals("0", to.get(0));
         }
     }
+    
+    @Test
+    public void testMinBigDecimal() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0,  new BigDecimal(i));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "min(bigdecimal)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(0), to.get(0));
+        }
+    }
 
     @Test
     public void testMaxDouble() throws Exception {
@@ -754,6 +830,28 @@ public class TestOver {
             assertEquals("9", to.get(0));
         }
     }
+    
+    @Test
+    public void testMaxBigDecimal() throws Exception {
+        Over func = new Over();
+        DataBag inbag = BagFactory.getInstance().newDefaultBag();
+        for (int i = 0; i < 10; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple(1);
+            t.set(0, new BigDecimal(i));
+            inbag.add(t);
+        }
+        Tuple t = TupleFactory.getInstance().newTuple(2);
+        t.set(0, inbag);
+        t.set(1, "max(bigdecimal)");
+        DataBag outbag = func.exec(t);
+        assertEquals(10, outbag.size());
+        int count = 0;
+        for (Tuple to : outbag) {
+            assertEquals(1, to.size());
+            assertEquals(new BigDecimal(count++), to.get(0));
+        }
+    }
+	
 
     @Test
     public void testRowNumber() throws Exception {