You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2016/06/23 18:15:35 UTC
svn commit: r1749956 - in /pig/trunk: CHANGES.txt
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
Author: daijy
Date: Thu Jun 23 18:15:35 2016
New Revision: 1749956
URL: http://svn.apache.org/viewvc?rev=1749956&view=rev
Log:
PIG-4906: Add Bigdecimal functions in Over function
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1749956&r1=1749955&r2=1749956&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jun 23 18:15:35 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4906: Add Bigdecimal functions in Over function (cgalan via daijy)
+
PIG-2768: Fix org.apache.hadoop.conf.Configuration deprecation warnings for Hadoop 23 (rohini)
OPTIMIZATIONS
Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java?rev=1749956&r1=1749955&r2=1749956&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java (original)
+++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/Over.java Thu Jun 23 18:15:35 2016
@@ -23,10 +23,13 @@ import java.util.Iterator;
import java.util.List;
import org.apache.pig.EvalFunc;
-import org.apache.pig.FuncSpec;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.AVG;
+import org.apache.pig.builtin.BigDecimalAvg;
+import org.apache.pig.builtin.BigDecimalMax;
+import org.apache.pig.builtin.BigDecimalMin;
+import org.apache.pig.builtin.BigDecimalSum;
import org.apache.pig.builtin.COUNT;
import org.apache.pig.builtin.DoubleAvg;
import org.apache.pig.builtin.DoubleMax;
@@ -54,6 +57,7 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
/**
* Given an aggregate function, a bag, and possibly a window definition,
@@ -73,23 +77,27 @@ import org.apache.pig.impl.logicalLayer.
* <li>sum(int)</li>
* <li>sum(long)</li>
* <li>sum(bytearray)</li>
+ * <li>sum(bigdecimal)</li>
* <li>avg(double)</li>
* <li>avg(float)</li>
* <li>avg(long)</li>
* <li>avg(int)</li>
* <li>avg(bytearray)</li>
+ * <li>avg(bigdecimal)</li>
* <li>min(double)</li>
* <li>min(float)</li>
* <li>min(long)</li>
* <li>min(int)</li>
* <li>min(chararray)</li>
* <li>min(bytearray)</li>
+ * <li>min(bigdecimal)</li>
* <li>max(double)</li>
* <li>max(float)</li>
* <li>max(long)</li>
* <li>max(int)</li>
* <li>max(chararray)</li>
* <li>max(bytearray)</li>
+ * <li>max(bigdecimal)</li>
* <li>row_number</li>
* <li>first_value</li>
* <li>last_value</li>
@@ -153,7 +161,8 @@ import org.apache.pig.impl.logicalLayer.
* current row and 3 following) over T;</tt>
*
* <p>Over accepts a constructor argument specifying the name and type,
- * colon-separated, of its return schema.</p>
+ * colon-separated, of its return schema. If the argument option is 'true' use the inner-search,
+ * take the name and type of bag and return a schema with alias+'_over' and the same type</p>
*
* <p><pre>
* DEFINE IOver org.apache.pig.piggybank.evaluation.Over('state_rk:int');
@@ -188,12 +197,14 @@ public class Over extends EvalFunc<DataB
private Object[] udfArgs;
private byte returnType;
private String returnName;
+ private boolean searchInnerType;
public Over() {
initialized = false;
udfArgs = null;
func = null;
returnType = DataType.UNKNOWN;
+ searchInnerType = false;
}
public Over(String typespec) {
@@ -202,12 +213,16 @@ public class Over extends EvalFunc<DataB
String[] fn_tn = typespec.split(":", 2);
this.returnName = fn_tn[0];
this.returnType = DataType.findTypeByName(fn_tn[1]);
- } else {
+ } else if(Boolean.parseBoolean(typespec)) {
+ searchInnerType = Boolean.parseBoolean(typespec);
+ }else{
this.returnName = "result";
this.returnType = DataType.findTypeByName(typespec);
- }
+ }
}
+
+
@Override
public DataBag exec(Tuple input) throws IOException {
if (input == null || input.size() < 2) {
@@ -255,19 +270,42 @@ public class Over extends EvalFunc<DataB
@Override
public Schema outputSchema(Schema inputSch) {
try {
- if (returnType == DataType.UNKNOWN) {
+ FieldSchema field;
+
+ if (searchInnerType) {
+ field = new FieldSchema(inputSch.getField(0));
+ while (searchInnerType) {
+ if (field.schema != null
+ && field.schema.getFields().size() > 1) {
+ searchInnerType = false;
+ } else {
+ if (field.type == DataType.TUPLE
+ || field.type == DataType.BAG) {
+ field = new FieldSchema(field.schema.getField(0));
+ } else {
+ field.alias = field.alias + "_over";
+ searchInnerType = false;
+ }
+ }
+ }
+
+ searchInnerType = true;
+ } else if (returnType == DataType.UNKNOWN) {
return Schema.generateNestedSchema(DataType.BAG, DataType.NULL);
} else {
- Schema outputTupleSchema = new Schema(new Schema.FieldSchema(returnName, returnType));
- return new Schema(new Schema.FieldSchema(
- getSchemaName(this.getClass().getName().toLowerCase(), inputSch),
- outputTupleSchema,
- DataType.BAG));
+ field = new Schema.FieldSchema(returnName, returnType);
}
+
+ Schema outputTupleSchema = new Schema(field);
+ return new Schema(new Schema.FieldSchema(getSchemaName(this
+ .getClass().getName().toLowerCase(), inputSch),
+ outputTupleSchema, DataType.BAG));
+
} catch (FrontendException fe) {
throw new RuntimeException("Unable to create nested schema", fe);
}
}
+
private void init(Tuple input) throws IOException {
initialized = true;
@@ -329,6 +367,8 @@ public class Over extends EvalFunc<DataB
func = new LongSum();
} else if ("sum(bytearray)".equalsIgnoreCase(agg)) {
func = new SUM();
+ } else if ("sum(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalSum();
} else if ("avg(double)".equalsIgnoreCase(agg)) {
func = new DoubleAvg();
} else if ("avg(float)".equalsIgnoreCase(agg)) {
@@ -339,6 +379,8 @@ public class Over extends EvalFunc<DataB
func = new IntAvg();
} else if ("avg(bytearray)".equalsIgnoreCase(agg)) {
func = new AVG();
+ } else if ("avg(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalAvg();
} else if ("min(double)".equalsIgnoreCase(agg)) {
func = new DoubleMin();
} else if ("min(float)".equalsIgnoreCase(agg)) {
@@ -351,6 +393,8 @@ public class Over extends EvalFunc<DataB
func = new StringMin();
} else if ("min(bytearray)".equalsIgnoreCase(agg)) {
func = new MIN();
+ } else if ("min(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalMin();
} else if ("max(double)".equalsIgnoreCase(agg)) {
func = new DoubleMax();
} else if ("max(float)".equalsIgnoreCase(agg)) {
@@ -363,6 +407,8 @@ public class Over extends EvalFunc<DataB
func = new StringMax();
} else if ("max(bytearray)".equalsIgnoreCase(agg)) {
func = new MAX();
+ } else if ("max(bigdecimal)".equalsIgnoreCase(agg)) {
+ func = new BigDecimalMax();
} else if ("row_number".equalsIgnoreCase(agg)) {
func = new RowNumber();
} else if ("first_value".equalsIgnoreCase(agg)) {
Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java
URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java?rev=1749956&r1=1749955&r2=1749956&view=diff
==============================================================================
--- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java (original)
+++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/evaluation/TestOver.java Thu Jun 23 18:15:35 2016
@@ -18,12 +18,11 @@
package org.apache.pig.piggybank.evaluation;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.math.BigDecimal;
import java.util.Iterator;
-import java.util.List;
import java.util.Random;
import org.apache.pig.backend.executionengine.ExecException;
@@ -34,8 +33,6 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
-
-import org.junit.Before;
import org.junit.Test;
public class TestOver {
@@ -66,11 +63,25 @@ public class TestOver {
out = func.outputSchema(in);
assertEquals("{org.apache.pig.piggybank.evaluation.over_3: {result: double}}", out.toString());
+ // bigdecimal
+ func = new Over("BIGDECIMAL");
+ in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
+ out = func.outputSchema(in);
+ assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {result: bigdecimal}}", out.toString());
+
// named
func = new Over("bob:chararray");
in = Schema.generateNestedSchema(DataType.BAG, DataType.INTEGER);
out = func.outputSchema(in);
- assertEquals("{org.apache.pig.piggybank.evaluation.over_4: {bob: chararray}}", out.toString());
+ assertEquals("{org.apache.pig.piggybank.evaluation.over_5: {bob: chararray}}", out.toString());
+
+
+ // Search inner alias and type
+ func = new Over("true");
+ in = Schema.generateNestedSchema(DataType.BAG, DataType.BIGDECIMAL);
+ in.getField(0).schema.getField(0).alias="test";
+ out = func.outputSchema(in);
+ assertEquals("{org.apache.pig.piggybank.evaluation.over_6: {test_over: bigdecimal}}", out.toString());
}
@Test
@@ -397,6 +408,28 @@ public class TestOver {
assertEquals(new Long(10), to.get(0));
}
}
+
+ @Test
+ public void testSumBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(1));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(4);
+ t.set(0, inbag);
+ t.set(1, "sum(bigdecimal)");
+ t.set(2, -1);
+ t.set(3, -1);
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(10), to.get(0));
+ }
+ }
@Test
public void testAvgDouble() throws Exception {
@@ -509,6 +542,29 @@ public class TestOver {
}
@Test
+ public void testAvgBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(i));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(4);
+ t.set(0, inbag);
+ t.set(1, "avg(bigdecimal)");
+ t.set(2, -1);
+ t.set(3, -1);
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(4.5), to.get(0));
+ }
+ }
+
+
+ @Test
public void testMinDouble() throws Exception {
Over func = new Over();
DataBag inbag = BagFactory.getInstance().newDefaultBag();
@@ -627,6 +683,26 @@ public class TestOver {
assertEquals("0", to.get(0));
}
}
+
+ @Test
+ public void testMinBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(i));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, inbag);
+ t.set(1, "min(bigdecimal)");
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(0), to.get(0));
+ }
+ }
@Test
public void testMaxDouble() throws Exception {
@@ -754,6 +830,28 @@ public class TestOver {
assertEquals("9", to.get(0));
}
}
+
+ @Test
+ public void testMaxBigDecimal() throws Exception {
+ Over func = new Over();
+ DataBag inbag = BagFactory.getInstance().newDefaultBag();
+ for (int i = 0; i < 10; i++) {
+ Tuple t = TupleFactory.getInstance().newTuple(1);
+ t.set(0, new BigDecimal(i));
+ inbag.add(t);
+ }
+ Tuple t = TupleFactory.getInstance().newTuple(2);
+ t.set(0, inbag);
+ t.set(1, "max(bigdecimal)");
+ DataBag outbag = func.exec(t);
+ assertEquals(10, outbag.size());
+ int count = 0;
+ for (Tuple to : outbag) {
+ assertEquals(1, to.size());
+ assertEquals(new BigDecimal(count++), to.get(0));
+ }
+ }
+
@Test
public void testRowNumber() throws Exception {