You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/01/22 17:08:56 UTC

svn commit: r736685 - in /hadoop/pig/trunk: CHANGES.txt contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java

Author: gates
Date: Thu Jan 22 08:08:55 2009
New Revision: 736685

URL: http://svn.apache.org/viewvc?rev=736685&view=rev
Log:
PIG-620 Added MaxTupleBy1stField UDF to piggybank.


Added:
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java
Modified:
    hadoop/pig/trunk/CHANGES.txt

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=736685&r1=736684&r2=736685&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Jan 22 08:08:55 2009
@@ -8,6 +8,8 @@
 
     PIG-554 Added fragment replicate map side join (shravanmn via pkamath and gates)
 
+	PIG-620: Added MaxTupleBy1stField UDF to piggybank (vzaliva via gates)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -365,3 +367,4 @@
     PIG-623: Fix spelling errors in output messages (tomwhite via sms)
 
     PIG-622: Include pig executable in distribution (tomwhite via sms)
+

Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java?rev=736685&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/MaxTupleBy1stField.java Thu Jan 22 08:08:55 2009
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.evaluation;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+import org.apache.pig.data.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * MaxTupleBy1stField UDF returns a tuple with max value of the first field in a
+ * given bag.
+ *
+ * Caveat: first field assumed to have type 'long'. You may need to enforece this
+ * via schema when loading data, as sown in sample usage below.
+ * 
+ * Sample usage: 
+ * 
+ * A = load 'test.tsv' as (first: long, second, third); 
+ * B = GROUP A by second; 
+ * C = FOREACH B GENERATE group, MaxTupleBy1stField(A);
+ * 
+ * @author Vadim Zaliva <lo...@codemindes.com>
+ */
+public class MaxTupleBy1stField extends EvalFunc<Tuple> implements Algebraic
+{
+    /**
+     * Indicates once for how many items progress hartbeat should be sent.
+     */
+    private static final int PROGRESS_FREQUENCY = 10;
+
+    static public class Initial extends EvalFunc<Tuple>
+    {
+        //TODO: private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException
+        {
+            try
+            {
+                // input is a bag with one tuple containing
+                // the column we are trying to max on
+                DataBag bg = (DataBag) input.get(0);
+                Tuple tp = bg.iterator().next();
+                return tp; //TODO: copy?
+            } catch(ExecException ee)
+            {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    public Schema outputSchema(Schema input) 
+    {
+        return input;
+    }
+
+    static public class Intermediate extends EvalFunc<Tuple>
+    {
+        //TODO: private static TupleFactory tfact = TupleFactory.getInstance();
+
+        @Override
+        public Tuple exec(Tuple input) throws IOException
+        {
+            try
+            {
+                return max(input, reporter);
+            } catch(ExecException ee)
+            {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    static public class Final extends EvalFunc<Tuple>
+    {
+        @Override
+        public Tuple exec(Tuple input) throws IOException
+        {
+            try
+            {
+                return max(input, reporter);
+            } catch(ExecException ee)
+            {
+                IOException oughtToBeEE = new IOException();
+                oughtToBeEE.initCause(ee);
+                throw oughtToBeEE;
+            }
+        }
+    }
+
+    @Override
+    public Tuple exec(Tuple input) throws IOException
+    {
+        try
+        {
+            return max(input, reporter);
+        } catch(ExecException ee)
+        {
+            IOException oughtToBeEE = new IOException();
+            oughtToBeEE.initCause(ee);
+            throw oughtToBeEE;
+        }
+    }
+
+    protected static Tuple max(Tuple input, PigProgressable reporter) throws ExecException
+    {
+        DataBag values = (DataBag) input.get(0);
+
+        // if we were handed an empty bag, return NULL
+        // this is in compliance with SQL standard
+        if(values.size() == 0)
+            return null;
+
+        long curMax = 0;
+        Tuple curMaxTuple = null;
+        int n=0;
+        for(Iterator<Tuple> it = values.iterator(); it.hasNext();)
+        {
+            if(reporter!=null && ++n%PROGRESS_FREQUENCY==0)
+                reporter.progress();
+            Tuple t = it.next();
+            try
+            {
+                long d = (Long) t.get(0);
+                if(curMaxTuple == null || d > curMax)
+                {
+                    curMax = d;
+                    curMaxTuple = t;
+                }
+
+            } catch(RuntimeException exp)
+            {
+                ExecException newE = new ExecException("Error processing: " + t.toString() + exp.getMessage());
+                newE.initCause(exp);
+                throw newE;
+            }
+        }
+
+        return curMaxTuple;
+    }
+
+    @Override
+    public String getInitial()
+    {
+        return Initial.class.getName();
+    }
+
+    @Override
+    public String getIntermed()
+    {
+        return Intermediate.class.getName();
+    }
+
+    @Override
+    public String getFinal()
+    {
+        return Final.class.getName();
+    }
+
+}