You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/02/11 20:34:37 UTC
svn commit: r909116 - in /hadoop/pig/trunk: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/
src/org/apache/pig/data/
Author: gates
Date: Thu Feb 11 19:34:21 2010
New Revision: 909116
URL: http://svn.apache.org/viewvc?rev=909116&view=rev
Log:
PIG-1217: Fix argToFuncMapping in Piggybank Top function.
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Feb 11 19:34:21 2010
@@ -93,6 +93,8 @@
BUG FIXES
+PIG-1217: Fix argToFuncMapping in Piggybank Top function (dvryaboy via gates)
+
PIG-1154: Local Mode fails when hadoop config directory is specified in
classpath (ankit.modi via gates)
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/Top.java Thu Feb 11 19:34:21 2010
@@ -23,6 +23,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
@@ -30,11 +35,13 @@
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
/**
- * TopN UDF accepts a bag of tuples and returns top-n tuples depending upon the
+ * Top UDF accepts a bag of tuples and returns top-n tuples depending upon the
* tuple field value of type long. Both n and field number needs to be provided
* to the UDF. The UDF iterates through the input bag and just retains top-n
* tuples by storing them in a priority queue of size n+1 where priority is the
@@ -43,6 +50,8 @@
* UDF is especially helpful for turning the nested grouping operation inside
* out and retaining top-n in a nested group.
*
+ * Assumes all tuples in the bag contain an element of the same type in the compared column.
+ *
* Sample usage:
* A = LOAD 'test.tsv' as (first: chararray, second: chararray);
* B = GROUP A BY (first, second);
@@ -53,95 +62,288 @@
* GENERATE FLATTEN(result);
* }
*/
-public class Top extends EvalFunc<DataBag> {
+public class Top extends EvalFunc<DataBag> implements Algebraic{
+ private static final Log log = LogFactory.getLog(Top.class);
+ static BagFactory mBagFactory = BagFactory.getInstance();
+ static TupleFactory mTupleFactory = TupleFactory.getInstance();
+ private Random randomizer = new Random();
+
+ static class TupleComparator implements Comparator<Tuple> {
+ private final int fieldNum;
+ private byte datatype;
+ private boolean typeFound=false;
+
+ public TupleComparator(int fieldNum) {
+ this.fieldNum = fieldNum;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public int compare(Tuple o1, Tuple o2) {
+ if (o1 == null)
+ return -1;
+ if (o2 == null)
+ return 1;
+ try {
+ Object field1 = o1.get(fieldNum);
+ Object field2 = o2.get(fieldNum);
+ if (!typeFound) {
+ datatype = DataType.findType(field1);
+ typeFound = true;
+ }
+ return DataType.compare(field1, field2, datatype, datatype);
+ } catch (ExecException e) {
+ throw new RuntimeException("Error while comparing o1:" + o1
+ + " and o2:" + o2, e);
+ }
+ }
+ }
+
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() < 3) {
+ return null;
+ }
+ try {
+ int n = (Integer) tuple.get(0);
+ int fieldNum = (Integer) tuple.get(1);
+ DataBag inputBag = (DataBag) tuple.get(2);
+ PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+ new TupleComparator(fieldNum));
+ updateTop(store, n, inputBag);
+ DataBag outputBag = mBagFactory.newDefaultBag();
+ for (Tuple t : store) {
+ outputBag.add(t);
+ }
+ if (log.isDebugEnabled()) {
+ if (randomizer.nextInt(1000) == 1) {
+ log.debug("outputting a bag: ");
+ for (Tuple t : outputBag)
+ log.debug("outputting "+t.toDelimitedString("\t"));
+ log.debug("==================");
+ }
+ }
+ return outputBag;
+ } catch (ExecException e) {
+ throw new RuntimeException("ExecException executing function: ", e);
+ } catch (Exception e) {
+ throw new RuntimeException("General Exception executing function: " + e);
+ }
+ }
+
+ protected static void updateTop(PriorityQueue<Tuple> store, int limit, DataBag inputBag) {
+ Iterator<Tuple> itr = inputBag.iterator();
+ while (itr.hasNext()) {
+ Tuple t = itr.next();
+ store.add(t);
+ if (store.size() > limit)
+ store.poll();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
+ */
+ @Override
+ public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
+ List<FieldSchema> fields = new ArrayList<FieldSchema>(3);
+ fields.add(new Schema.FieldSchema(null, DataType.INTEGER));
+ fields.add(new Schema.FieldSchema(null, DataType.INTEGER));
+ fields.add(new Schema.FieldSchema(null, DataType.BAG));
+ FuncSpec funcSpec = new FuncSpec(this.getClass().getName(), new Schema(fields));
+ List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(1);
+ funcSpecs.add(funcSpec);
+ return funcSpecs;
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ try {
+ if (input.size() < 3) {
+ return null;
+ }
+ Schema.FieldSchema bagFs = new Schema.FieldSchema(null,
+ input.getField(2).schema, DataType.BAG);
+ return new Schema(bagFs);
+
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermed.class.getName();
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName();
+ }
+
+ /*
+ * Same as normal code-path exec, but outputs a Tuple with the schema
+ * <Int, Int, DataBag> -- same schema as expected input.
+ */
+ static public class Initial extends EvalFunc<Tuple> {
+ //private static final Log log = LogFactory.getLog(Initial.class);
+ //private final Random randomizer = new Random();
+ @Override
+ public Tuple exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() < 3) {
+ return null;
+ }
+
+ try {
+ int n = (Integer) tuple.get(0);
+ int fieldNum = (Integer) tuple.get(1);
+ DataBag inputBag = (DataBag) tuple.get(2);
+ Tuple retTuple = mTupleFactory.newTuple(3);
+ DataBag outputBag = mBagFactory.newDefaultBag();
+ // initially, there should only be one, so not much point in doing the priority queue
+ for (Tuple t : inputBag) {
+ outputBag.add(t);
+ }
+ retTuple.set(0, n);
+ retTuple.set(1,fieldNum);
+ retTuple.set(2, outputBag);
+ return retTuple;
+ } catch (Exception e) {
+ throw new RuntimeException("General Exception executing function: " + e);
+ }
+ }
+ }
+
+ static public class Intermed extends EvalFunc<Tuple> {
+ private static final Log log = LogFactory.getLog(Intermed.class);
+ private final Random randomizer = new Random();
+ /* The input is a tuple that contains a single bag.
+ * This bag contains outputs of the Initial step --
+ * tuples of the format (limit, index, { top_tuples })
+ *
+ * We need to take the top of tops and return a similar tuple.
+ *
+ * (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ if (input == null || input.size() < 1) {
+ return null;
+ }
+ try {
+ DataBag bagOfIntermediates = (DataBag) input.get(0);
+ Iterator<Tuple> intermediateIterator = bagOfIntermediates.iterator();
+ if (!intermediateIterator.hasNext()) {
+ return null;
+ }
+ Tuple peekTuple = intermediateIterator.next();
+ if (peekTuple == null || peekTuple.size() < 3 ) return null;
+ int n = (Integer) peekTuple.get(0);
+ int fieldNum = (Integer) peekTuple.get(1);
+ DataBag inputBag = (DataBag) peekTuple.get(2);
- BagFactory mBagFactory = BagFactory.getInstance();
+ PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+ new TupleComparator(fieldNum));
- static class TupleComparator implements Comparator<Tuple> {
- private int fieldNum;
+ updateTop(store, n, inputBag);
- public TupleComparator(int fieldNum) {
- this.fieldNum = fieldNum;
- }
-
- @Override
- public int compare(Tuple o1, Tuple o2) {
- if (o1 == null)
- return -1;
- if (o2 == null)
- return 1;
- int retValue = 1;
- try {
- long count1 = (Long) o1.get(fieldNum);
- long count2 = (Long) o2.get(fieldNum);
- retValue = (count1 > count2) ? 1 : ((count1 == count2) ? 0 : -1);
- } catch (ExecException e) {
- throw new RuntimeException("Error while comparing o1:" + o1
- + " and o2:" + o2, e);
- }
- return retValue;
- }
- }
-
- @Override
- public DataBag exec(Tuple tuple) throws IOException {
- if (tuple == null || tuple.size() < 3) {
- return null;
- }
- try {
- int n = (Integer) tuple.get(0);
- int fieldNum = (Integer) tuple.get(1);
- DataBag inputBag = (DataBag) tuple.get(2);
- PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
- new TupleComparator(fieldNum));
- Iterator<Tuple> itr = inputBag.iterator();
- while (itr.hasNext()) {
- Tuple t = itr.next();
- store.add(t);
- if (store.size() > n)
- store.poll();
- }
- DataBag outputBag = mBagFactory.newDefaultBag();
- for (Tuple t : store) {
- outputBag.add(t);
- }
- return outputBag;
- } catch (ExecException e) {
- throw new RuntimeException("ExecException executing function: ", e);
- } catch (Exception e) {
- throw new RuntimeException("General Exception executing function: " + e);
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.pig.EvalFunc#getArgToFuncMapping()
- */
- @Override
- public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
- List<FuncSpec> funcList = new ArrayList<FuncSpec>();
- funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
- new Schema.FieldSchema(null, DataType.INTEGER))));
- funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
- new Schema.FieldSchema(null, DataType.INTEGER))));
- funcList.add(new FuncSpec(this.getClass().getName(), new Schema(
- new Schema.FieldSchema(null, DataType.BAG))));
- return funcList;
- }
-
- @Override
- public Schema outputSchema(Schema input) {
- try {
- if (input.size() < 3) {
- return null;
- }
- Schema.FieldSchema bagFs = new Schema.FieldSchema("bag_of_input_tuples",
- input.getField(2).schema, DataType.BAG);
- return new Schema(bagFs);
+ while (intermediateIterator.hasNext()) {
+ Tuple t = intermediateIterator.next();
+ if (t == null || t.size() < 3 ) continue;
+ updateTop(store, n, (DataBag) t.get(2));
+ }
- } catch (Exception e) {
- return null;
+ DataBag outputBag = mBagFactory.newDefaultBag();
+ for (Tuple t : store) {
+ outputBag.add(t);
+ }
+ Tuple retTuple = mTupleFactory.newTuple(3);
+ retTuple.set(0, n);
+ retTuple.set(1,fieldNum);
+ retTuple.set(2, outputBag);
+ if (log.isDebugEnabled()) {
+ if (randomizer.nextInt(1000) == 1) log.debug("outputting "+retTuple.toDelimitedString("\t"));
+ }
+ return retTuple;
+ } catch (ExecException e) {
+ throw new RuntimeException("ExecException executing function: ", e);
+ } catch (Exception e) {
+ throw new RuntimeException("General Exception executing function: " + e);
+ }
+ }
+
+ }
+
+ static public class Final extends EvalFunc<DataBag> {
+
+ private static final Log log = LogFactory.getLog(Final.class);
+ private final Random randomizer = new Random();
+
+
+
+ /*
+ * The input to this function is a tuple that contains a single bag.
+ * This bag, in turn, contains outputs of the Intermediate step --
+ * tuples of the format (limit, index, { top_tuples } )
+ *
+ * we want to return a bag of top tuples
+ *
+ * (non-Javadoc)
+ * @see org.apache.pig.EvalFunc#exec(org.apache.pig.data.Tuple)
+ */
+ @Override
+ public DataBag exec(Tuple tuple) throws IOException {
+ if (tuple == null || tuple.size() < 1) {
+ return null;
+ }
+ try {
+ DataBag bagOfIntermediates = (DataBag) tuple.get(0);
+ Iterator<Tuple> intermediateIterator = bagOfIntermediates.iterator();
+ if (!intermediateIterator.hasNext()) {
+ return null;
+ }
+ Tuple peekTuple = intermediateIterator.next();
+ if (peekTuple == null || peekTuple.size() < 3 ) return null;
+ int n = (Integer) peekTuple.get(0);
+ int fieldNum = (Integer) peekTuple.get(1);
+ DataBag inputBag = (DataBag) peekTuple.get(2);
+
+ PriorityQueue<Tuple> store = new PriorityQueue<Tuple>(n + 1,
+ new TupleComparator(fieldNum));
+
+ updateTop(store, n, inputBag);
+
+ while (intermediateIterator.hasNext()) {
+ Tuple t = intermediateIterator.next();
+ if (t == null || t.size() < 3 ) continue;
+ updateTop(store, n, (DataBag) t.get(2));
+ }
+
+ DataBag outputBag = mBagFactory.newDefaultBag();
+ for (Tuple t : store) {
+ outputBag.add(t);
+ }
+ if (log.isDebugEnabled()) {
+ if (randomizer.nextInt(1000) == 1) for (Tuple t : outputBag) log.debug("outputting "+t.toDelimitedString("\t"));
+ }
+ return outputBag;
+ } catch (ExecException e) {
+ throw new RuntimeException("ExecException executing function: ", e);
+ } catch (Exception e) {
+ throw new RuntimeException("General Exception executing function: " + e);
+ }
+ }
}
- }
}
+
Modified: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java (original)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/TestTop.java Thu Feb 11 19:34:21 2010
@@ -17,46 +17,82 @@
*/
package org.apache.pig.piggybank.test.evaluation.util;
-import java.util.Iterator;
+import java.io.IOException;
+import junit.framework.TestCase;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DefaultTupleFactory;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.piggybank.evaluation.util.Top;
import org.junit.Test;
-import junit.framework.TestCase;
-
public class TestTop extends TestCase {
- @Test
- public void testTop() throws Exception {
- Top top = new Top();
- Tuple inputTuple = DefaultTupleFactory.getInstance().newTuple(3);
- // set N = 10 i.e retain top 10 tuples
- inputTuple.set(0, 10);
- // compare tuples by field number 1
- inputTuple.set(1, 1);
- // set the data bag containing the tuples
- DataBag dBag = DefaultBagFactory.getInstance().newDefaultBag();
- inputTuple.set(2, dBag);
- // generate tuples of the form (group-1, 1), (group-2, 2) ...
- for (long i = 0; i < 100; i++) {
- Tuple nestedTuple = DefaultTupleFactory.getInstance().newTuple(2);
- nestedTuple.set(0, "group-" + i);
- nestedTuple.set(1, i);
- dBag.add(nestedTuple);
+ Top top_ = new Top();
+ TupleFactory tupleFactory_ = DefaultTupleFactory.getInstance();
+ BagFactory bagFactory_ = DefaultBagFactory.getInstance();
+ Tuple inputTuple_ = tupleFactory_.newTuple(3);
+ DataBag dBag_ = bagFactory_.newDefaultBag();
+
+ public void setup() throws ExecException {
+ // set N = 10 i.e retain top 10 tuples
+ inputTuple_.set(0, 10);
+ // compare tuples by field number 1
+ inputTuple_.set(1, 1);
+ // set the data bag containing the tuples
+ inputTuple_.set(2, dBag_);
+
+ // generate tuples of the form (group-1, 1), (group-2, 2) ...
+ for (long i = 0; i < 100; i++) {
+ Tuple nestedTuple = tupleFactory_.newTuple(2);
+ nestedTuple.set(0, "group-" + i);
+ nestedTuple.set(1, i);
+ dBag_.add(nestedTuple);
+ }
+ }
+ @Test
+ public void testTopExec() throws Exception {
+ setup();
+ DataBag outBag = top_.exec(inputTuple_);
+ assertEquals(outBag.size(), 10L);
+ checkItemsGT(outBag, 1, 89);
+ }
+
+ @Test
+ public void testTopAlgebraic() throws IOException {
+ setup();
+ // two initial results
+ Tuple init1 = (new Top.Initial()).exec(inputTuple_);
+ Tuple init2 = (new Top.Initial()).exec(inputTuple_);
+ // two intermediate results
+
+ DataBag intermedBag = bagFactory_.newDefaultBag();
+ intermedBag.add(init1);
+ intermedBag.add(init2);
+ Tuple intermedInput = tupleFactory_.newTuple(intermedBag);
+ Tuple intermedOutput1 = (new Top.Intermed()).exec(intermedInput);
+ Tuple intermedOutput2 = (new Top.Intermed()).exec(intermedInput);
+ checkItemsGT((DataBag)intermedOutput1.get(2), 1, 94);
+
+ // final result
+ DataBag finalInputBag = bagFactory_.newDefaultBag();
+ finalInputBag.add(intermedOutput1);
+ finalInputBag.add(intermedOutput2);
+ Tuple finalInput = tupleFactory_.newTuple(finalInputBag);
+ DataBag outBag = (new Top.Final()).exec(finalInput);
+ assertEquals(outBag.size(), 10L);
+ checkItemsGT(outBag, 1, 96);
}
- DataBag outBag = top.exec(inputTuple);
- super.assertEquals(outBag.size(), 10L);
- Iterator<Tuple> itr = outBag.iterator();
- while (itr.hasNext()) {
- Tuple next = itr.next();
- Long value = (Long) next.get(1);
- super.assertTrue("Value " + value + " exceeded the expected limit",
- value > 89);
+ private void checkItemsGT(Iterable<Tuple> tuples, int field, int limit) throws ExecException {
+ for (Tuple t : tuples) {
+ Long val = (Long) t.get(field);
+ assertTrue("Value "+ val + " exceeded the expected limit", val > limit);
+ }
}
- }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/data/DataType.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DataType.java?rev=909116&r1=909115&r2=909116&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DataType.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DataType.java Thu Feb 11 19:34:21 2010
@@ -298,11 +298,22 @@
* @param o2 Second object
* @return -1 if o1 is less, 0 if they are equal, 1 if o2 is less.
*/
- @SuppressWarnings("unchecked")
public static int compare(Object o1, Object o2) {
+
byte dt1 = findType(o1);
byte dt2 = findType(o2);
+ return compare(o1, o2, dt1, dt2);
+ }
+ /*
+ * Same as compare(Object o1, Object o2), but does not use reflection to determine the type
+ * of passed in objects, relying instead on the caller to provide the appropriate values, as
+ * determined by DataType.findType(Object o);
+ *
+ * Use this version in cases where multiple objects of the same type have to be repeatedly compared.
+ */
+ @SuppressWarnings("unchecked")
+ public static int compare(Object o1, Object o2, byte dt1, byte dt2) {
if (dt1 == dt2) {
switch (dt1) {
case NULL: