You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/01/22 17:08:56 UTC
svn commit: r736685 - in /hadoop/pig/trunk: CHANGES.txt
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java
Author: gates
Date: Thu Jan 22 08:08:55 2009
New Revision: 736685
URL: http://svn.apache.org/viewvc?rev=736685&view=rev
Log:
PIG-620 Added MaxTupleBy1stField UDF to piggybank.
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java
Modified:
hadoop/pig/trunk/CHANGES.txt
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=736685&r1=736684&r2=736685&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 22 08:08:55 2009
@@ -8,6 +8,8 @@
PIG-554 Added fragment replicate map side join (shravanmn via pkamath and gates)
+ PIG-620: Added MaxTupleBy1stField UDF to piggybank (vzaliva via gates)
+
OPTIMIZATIONS
BUG FIXES
@@ -365,3 +367,4 @@
PIG-623: Fix spelling errors in output messages (tomwhite via sms)
PIG-622: Include pig executable in distribution (tomwhite via sms)
+
Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java?rev=736685&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java Thu Jan 22 08:08:55 2009
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.evaluation;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * MaxTupleBy1stField UDF returns a tuple with max value of the first field in a
+ * given bag.
+ *
+ * Caveat: first field assumed to have type 'long'. You may need to enforece this
+ * via schema when loading data, as sown in sample usage below.
+ *
+ * Sample usage:
+ *
+ * A = load 'test.tsv' as (first: long, second, third);
+ * B = GROUP A by second;
+ * C = FOREACH B GENERATE group, MaxTupleBy1stField(A);
+ *
+ * @author Vadim Zaliva <lo...@codemindes.com>
+ */
+public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic
+{
+ /**
+ * Indicates once for how many items progress hartbeat should be sent.
+ */
+ private static final int PROGRESS_FREQUENCY = 10;
+
+ static public class Initial extends EvalFunc<Tuple>
+ {
+ //TODO: private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException
+ {
+ try
+ {
+ // input is a bag with one tuple containing
+ // the column we are trying to max on
+ DataBag bg = (DataBag) input.get(0);
+ Tuple tp = bg.iterator().next();
+ return tp; //TODO: copy?
+ } catch(ExecException ee)
+ {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ public Schema outputSchema(Schema input)
+ {
+ return input;
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple>
+ {
+ //TODO: private static TupleFactory tfact = TupleFactory.getInstance();
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException
+ {
+ try
+ {
+ return max(input, reporter);
+ } catch(ExecException ee)
+ {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ static public class Final extends EvalFunc<Tuple>
+ {
+ @Override
+ public Tuple exec(Tuple input) throws IOException
+ {
+ try
+ {
+ return max(input, reporter);
+ } catch(ExecException ee)
+ {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+ }
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException
+ {
+ try
+ {
+ return max(input, reporter);
+ } catch(ExecException ee)
+ {
+ IOException oughtToBeEE = new IOException();
+ oughtToBeEE.initCause(ee);
+ throw oughtToBeEE;
+ }
+ }
+
+ protected static Tuple max(Tuple input, PigProgressable reporter) throws ExecException
+ {
+ DataBag values = (DataBag) input.get(0);
+
+ // if we were handed an empty bag, return NULL
+ // this is in compliance with SQL standard
+ if(values.size() == 0)
+ return null;
+
+ long curMax = 0;
+ Tuple curMaxTuple = null;
+ int n=0;
+ for(Iterator<Tuple> it = values.iterator(); it.hasNext();)
+ {
+ if(reporter!=null && ++n%PROGRESS_FREQUENCY==0)
+ reporter.progress();
+ Tuple t = it.next();
+ try
+ {
+ long d = (Long) t.get(0);
+ if(curMaxTuple == null || d > curMax)
+ {
+ curMax = d;
+ curMaxTuple = t;
+ }
+
+ } catch(RuntimeException exp)
+ {
+ ExecException newE = new ExecException("Error processing: " + t.toString() + exp.getMessage());
+ newE.initCause(exp);
+ throw newE;
+ }
+ }
+
+ return curMaxTuple;
+ }
+
+ @Override
+ public String getInitial()
+ {
+ return Initial.class.getName();
+ }
+
+ @Override
+ public String getIntermed()
+ {
+ return Intermediate.class.getName();
+ }
+
+ @Override
+ public String getFinal()
+ {
+ return Final.class.getName();
+ }
+
+}