You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2010/08/05 02:57:55 UTC

svn commit: r982448 - in /hadoop/pig/trunk/contrib: ./ piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/

Author: olga
Date: Thu Aug  5 00:57:54 2010
New Revision: 982448

URL: http://svn.apache.org/viewvc?rev=982448&view=rev
Log:
PIG-1386 UDF to extend functionalities of MaxTupleBy1stField (hcbusy via olgan)

Added:
    hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
    hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestExtremalTupleByNthField.java
Modified:
    hadoop/pig/trunk/contrib/CHANGES.txt

Modified: hadoop/pig/trunk/contrib/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/CHANGES.txt?rev=982448&r1=982447&r2=982448&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/CHANGES.txt Thu Aug  5 00:57:54 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1386 UDF to extend functionalities of MaxTupleBy1stField (hcbusy via olgan)
+
 PIG-1526 improvements to HiveColumnarLoader - Partitioning Support (gerritjvv via olgan)
 
 PIG-1229 allow pig to write output into a JDBC db (ankur via hashutosh)

Added: hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java?rev=982448&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ExtremalTupleByNthField.java Thu Aug  5 00:57:54 2010
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.piggybank.evaluation;
+
+import java.io.IOException;
+import java.lang.reflect.Type;
+
+import org.apache.pig.Accumulator;
+import org.apache.pig.Algebraic;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * 
+ * This class is similar to MaxTupleBy1stField except that it allows you to
+ * specify with field to use (instead of just using 1st field) and to specify
+ * ascending or descending. The first parameter in the constructor specifies
+ * which field to use and the second parameter to the constructor specifies
+ * which extremal to retrieve. Strings prefixed by "min", "least", "desc",
+ * "small" and "-", irrespective of capitalization and leading white spacing,
+ * specifies the computation of the minimum and all other strings means maximum;
+ * 
+ * <h2>Example1: Invoking the UDF</h2> e.g. Using this udf: <br />
+ * <code>
+ * 		define myMin ExtremalTupleByNthField( '4', 'min' ); 
+ * 		T = group G ALL; 
+ * 		R = foreach T generate myMin(G);
+ * </code> is equivalent to:<br />
+ * 
+ * <code>
+ * 		T = order G by $3 asc; 
+ * 		R = limit G 1;
+ * </code>
+ * 
+ * Note above 4 indicates the field with index 3 in the tuple. The 4th field can
+ * be any comparable type, so you can use float, int, string, or even tuples.
+ * 
+ * By default constructor, this UDF behaves as MaxTupleBy1stField in that it
+ * chooses the max tuple by the comparable in the first field.
+ * 
+ * <h2>Example 2: Default behavior</h2> This class also has a one parameter
+ * constructor that specifies the index and takes the max tuple from the bag.
+ * <code>
+ * 		define myMax ExtremalTupleByNthField( '3' ); 
+ * 		T = group G ALL; 
+ * 		R = foreach T generate myMax(G);
+ * </code> is equivalent to:<br />
+ * 
+ * <code>
+ * 		T = order G by $2 desc; 
+ * 		R = limit G 1;
+ * </code>
+ * 
+ * <h2>Example 3: Choosing a Large Bag or Tuple</h2> Another possible use case
+ * is the choosing of larger or smaller bags/tuples. In pig, bags and tuples are
+ * comparable and the comparison is based on size.
+ * 
+ * <code>
+ *      define biggestBag ExtremalTupleByNthField('1', max);
+ * 		R = group TABLE by (key1, key2);
+ *      G = cogroup L by key1, R by group.key1; 
+ * 		V = foreach G generate L, biggestBag(R);
+ * </code>
+ * 
+ * This results in each L(eft) bag associated with only the largest bag from the
+ * R(ight) table. If all bags in R are of equal size, the comparator continues
+ * on to perform element-wise comparison. In case of a complete tie in the
+ * comparison, which result is returned is nondeterministic. But because this
+ * class is able to compare any comparable we are able to specify a secondary
+ * key.
+ * 
+ * <h2>Example 4: Secondary Sort Key</h2>
+ * 
+ * <code>
+ *      define biggestBag ExtremalTupleByNthField('1', max);
+ *      G = cogroup L by key1, M by key1, R by key1; 
+ * 		V = foreach G generate FLATTEN(L),
+ *                             biggestBag(R.($0, $1, $2, $5)) as best_result_by_0, 
+ *                             biggestBag(R.($3, $1, $2, $5)) as best_result_by_3,
+ *                             biggestBag(M.($0, $2)) as best_misc_data;
+ * </code>
+ * 
+ * this will generate two sets of results and misc data based on two separate
+ * criterion. Since all tuples in the bags have the same size (4, 4, 2
+ * respectively), the tuple comparator continues on and compares the members of
+ * tuples until it finds one. best_result_by_0 and best_result_by3 are ordered
+ * by 1st and 4th member of the tuples. Within each group, ties are broken by
+ * second and third field.
+ * 
+ * 
+ * Finally, note that the udf implements both Algebraic and Accumulator, so it
+ * is relatively efficient because it's a one-pass algorithm.
+ * 
+ * 
+ */
+public class ExtremalTupleByNthField extends EvalFunc<Tuple> implements
+		Algebraic, Accumulator<Tuple> {
+	/**
+	 * Indicates once for how many items progress heart beat should be sent.
+	 * This number has been increased from 10 to reduce verbosity.
+	 */
+	private static final int PROGRESS_FREQUENCY = 10000;
+
+	int fieldIndex;
+	int sign;
+
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * Constructors
+	 * 
+	 * @throws ExecException
+	 * 
+	 */
+
+	// defaults to max by first field
+	public ExtremalTupleByNthField() throws ExecException {
+		this("1", "max");
+	}
+
+	// defaults to max
+	public ExtremalTupleByNthField(String fieldIndexString)
+			throws ExecException {
+		this(fieldIndexString, "max");
+	}
+
+	public ExtremalTupleByNthField(String fieldIndexString, String order)
+			throws ExecException {
+		super();
+		this.fieldIndex = parseFieldIndex(fieldIndexString);
+		this.sign = parseOrdering(order);
+	}
+
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * The EvalFunc interface
+	 */
+	@Override
+	public Tuple exec(Tuple input) throws IOException {
+		return extreme(fieldIndex, sign, input, reporter);
+	}
+
+	@Override
+	public Type getReturnType() {
+		return Tuple.class;
+	}
+
+	public Schema outputSchema(Schema input) {
+		return input;
+	}
+
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * Algebraic interface
+	 */
+	@Override
+	public String getInitial() {
+		return HelperClass.class.getName();
+	}
+
+	@Override
+	public String getIntermed() {
+		return HelperClass.class.getName();
+	}
+
+	@Override
+	public String getFinal() {
+		return HelperClass.class.getName();
+	}
+
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * 
+	 * The Accumulator interface
+	 */
+	Tuple intermediate = null;
+	DataBag tempDb = BagFactory.getInstance().newDefaultBag();
+	Tuple parameterToExtreme = TupleFactory.getInstance().newTuple(tempDb);
+
+	@Override
+	public void accumulate(Tuple b) throws IOException {
+		try {
+			if (b != null) {
+				if (intermediate == null) {
+					// intermediate = b;
+					// make a shallow copy in case the Tuple was reused.
+					intermediate = TupleFactory.getInstance()
+							.newTuple(b.size());
+					for (int i = 0; i < b.size(); ++i) {
+						intermediate.set(i, b.get(i));
+					}
+				} else {
+					tempDb.clear();
+					tempDb.add(b);
+					tempDb.add(intermediate);
+					intermediate = extreme(fieldIndex, sign,
+							parameterToExtreme, reporter);
+				}
+			}// new result is null, don't consider it
+
+		} catch (ExecException ee) {
+			throw ee;
+		} catch (Exception e) {
+			int errCode = -1;
+			String msg = "Error while computing ExtremalTupleByNthField in "
+					+ this.getClass().getSimpleName();
+			throw new ExecException(msg, errCode, PigException.BUG, e);
+		}
+	}
+
+	@Override
+	public void cleanup() {
+		intermediate = null;
+	}
+
+	@Override
+	public Tuple getValue() {
+		return intermediate; // could be null correctly
+	}
+
+	/**
+	 * 
+	 * 
+	 * 
+	 * 
+	 * Utility classes and methods
+	 * 
+	 */
+	public static final class HelperClass extends EvalFunc<Tuple> {
+		int fieldIndex, sign;
+
+		public HelperClass() throws ExecException {
+			this("1", "max");
+		}
+
+		public HelperClass(String fieldIndexString) throws ExecException {
+			this(fieldIndexString, "max");
+		}
+
+		public HelperClass(String fieldIndexString, String order)
+				throws ExecException {
+
+			this.fieldIndex = parseFieldIndex(fieldIndexString);
+			this.sign = parseOrdering(order);
+		}
+
+		public Tuple exec(Tuple input) throws IOException {
+			return extreme(fieldIndex, sign, input, reporter);
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	protected final static Tuple extreme(int pind, int psign, Tuple input,
+			PigProgressable reporter) throws ExecException {
+		DataBag values = (DataBag) input.get(0);
+
+		// if we were handed an empty bag, return NULL
+		// this is in compliance with SQL standard
+		if (values.size() == 0)
+			return null;
+
+		java.lang.Comparable curMax = null;
+		Tuple curMaxTuple = null;
+		int n = 0;
+		for (Tuple t : values) {
+			if (reporter != null && ++n % PROGRESS_FREQUENCY == 0)
+				reporter.progress();
+			if (t == null) {
+				// just in case.
+				continue;
+			}
+			try {
+				Object o = t.get(pind);
+				if (o == null) {
+					// if the comparison field is null it will never be
+					// returned, we won't even compare.
+					continue;
+				}
+
+				java.lang.Comparable d = (java.lang.Comparable) o;
+
+				if (curMax == null) {
+					curMax = d;
+					curMaxTuple = t;
+				} else {
+					/**
+					 * <pre>
+					 * c > 0 iff ((sign==1 && d>curMax) || (sign==-1 && d<curMax))
+					 * </pre>
+					 * 
+					 * In both case we want to replace curMax/curMaxTuple by the
+					 * new values
+					 * 
+					 **/
+					int c = psign * d.compareTo(curMax);
+					if (c > 0) {
+						curMax = d;
+						curMaxTuple = t;
+					}
+				}
+			} catch (ExecException ee) {
+				throw ee;
+			} catch (Exception e) {
+				int errCode = -1;
+				String msg = "Error while computing ExtremalTupleByNthField in ExtremalTupleByNthField,";
+				throw new ExecException(msg, errCode, PigException.ERROR, e);
+			}
+		}
+
+		return curMaxTuple;
+	}
+
+	protected static int parseFieldIndex(String inputFieldIndex)
+			throws ExecException {
+		// using a decrement to make sure that the subtraction happens correctly
+		int fieldIndex = Integer.valueOf(inputFieldIndex);
+
+		// to make fieldIndex 1-based instead of 0-based
+		--fieldIndex;
+		if (fieldIndex < 0) {
+			throw new ExecException("field index cannot be less than 1:"
+					+ inputFieldIndex, -1, PigException.ERROR, null);
+		}
+		return fieldIndex;
+	}
+
+	protected static int parseOrdering(String order) {
+		int sign = 1;
+		order = order.toLowerCase().trim();
+		if (order != null
+				&& (order.startsWith("min") || order.startsWith("desc")
+						|| order.startsWith("-") || order.startsWith("small") || order
+						.startsWith("least"))) {
+			sign = -1;
+		} else {
+			// either default to 1 by not specifying order(null) or it indicated
+			// "min" which is the string "min" the string "desc" or any string
+			// starting with a minus sign.
+			sign = 1;
+		}
+		return sign;
+	}
+}
\ No newline at end of file

Added: hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestExtremalTupleByNthField.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestExtremalTupleByNthField.java?rev=982448&view=auto
==============================================================================
--- hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestExtremalTupleByNthField.java (added)
+++ hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/TestExtremalTupleByNthField.java Thu Aug  5 00:57:54 2010
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.piggybank.test.evaluation;
+
+import java.util.Arrays;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.piggybank.evaluation.ExtremalTupleByNthField;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestExtremalTupleByNthField {
+
+	@Test
+	public void testMin() throws Exception {
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("3", "min");
+
+		DataBag input = BagFactory.getInstance().newDefaultBag();
+
+		for (int i = 100; i > 0; --i) {
+			Tuple t = TupleFactory.getInstance().newTuple();
+			t.append(i);
+			t.append(" " + i);
+			t.append(i);
+			input.add(t);
+		}
+
+		Tuple tupleInput = TupleFactory.getInstance().newTuple();
+		tupleInput.append(input);
+
+		Tuple out = o.exec(tupleInput);
+
+		Assert.assertEquals(" 1", (String) out.get(1));
+	}
+
+	@Test
+	public void testMax() throws Exception {
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("4", "max");
+
+		DataBag input = BagFactory.getInstance().newDefaultBag();
+
+		for (int i = 0; i < 100; i++) {
+			Tuple t = TupleFactory.getInstance().newTuple();
+			t.append(i);
+			t.append(" " + i);
+			t.append(i);
+			t.append(i);
+			input.add(t);
+		}
+
+		Tuple tupleInput = TupleFactory.getInstance().newTuple();
+		tupleInput.append(input);
+
+		Tuple out = o.exec(tupleInput);
+
+		Assert.assertEquals(" 99", (String) out.get(1));
+	}
+
+	@Test
+	public void testMaxComplexKey() throws Exception {
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("3", "max");
+
+		DataBag input = BagFactory.getInstance().newDefaultBag();
+
+		for (int j = 0; j < 3; ++j) {
+			for (int i = 0; i < 100; i++) {
+				Tuple t = TupleFactory.getInstance().newTuple();
+				t.append(-i);
+				t.append(" " + j + ", " + i);
+				Tuple key = TupleFactory.getInstance().newTuple();
+				key.append(j);
+				key.append(i);
+				t.append(key);
+				t.append(-i);
+				input.add(t);
+			}
+		}
+
+		Tuple tupleInput = TupleFactory.getInstance().newTuple();
+		tupleInput.append(input);
+
+		Tuple out = o.exec(tupleInput);
+
+		Assert.assertEquals(" 2, 99", (String) out.get(1));
+	}
+
+	@Test
+	public void testMinComplexKey() throws Exception {
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("3", "min");
+
+		DataBag input = BagFactory.getInstance().newDefaultBag();
+
+		for (int j = 0; j < 3; ++j) {
+			for (int i = 0; i < 100; i++) {
+				Tuple t = TupleFactory.getInstance().newTuple();
+				t.append(-i);
+				t.append(" " + j + ", " + i);
+				Tuple key = TupleFactory.getInstance().newTuple();
+				key.append(j);
+				key.append(i);
+				t.append(key);
+				t.append(-i);
+				input.add(t);
+			}
+		}
+
+		Tuple tupleInput = TupleFactory.getInstance().newTuple();
+		tupleInput.append(input);
+
+		Tuple out = o.exec(tupleInput);
+
+		Assert.assertEquals(" 0, 0", (String) out.get(1));
+	}
+
+	@Test
+	public void testMinStringey() throws Exception {
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("4", "min");
+
+		DataBag input = BagFactory.getInstance().newDefaultBag();
+		Tuple t = TupleFactory.getInstance().newTuple();
+		t.append("a");
+		t.append("a");
+		t.append("a");
+		t.append("min");
+		input.add(t);
+
+		t = TupleFactory.getInstance().newTuple();
+		t.append("b");
+		t.append("b");
+		t.append("b");
+		t.append("max");
+		input.add(t);
+
+		Tuple tupleInput = TupleFactory.getInstance().newTuple();
+		tupleInput.append(input);
+
+		Tuple out = o.exec(tupleInput);
+
+		// ironically "max" is smaller than "min"
+		Assert.assertEquals("b", (String) out.get(1));
+	}
+
+	@Test
+	public void testBiggerBag() throws Exception {
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("1", "max");
+
+		DataBag input = BagFactory.getInstance().newDefaultBag();
+		DataBag dbSmaller = BagFactory.getInstance().newDefaultBag();
+		dbSmaller.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has three items")));
+		dbSmaller.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has three items")));
+		dbSmaller.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has three items")));
+		input.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList(dbSmaller, "smaller")));
+
+		DataBag dbBigger = BagFactory.getInstance().newDefaultBag();
+		dbBigger.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has four items")));
+		dbBigger.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has four items")));
+		dbBigger.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has four items")));
+		dbBigger.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has four items")));
+		dbBigger.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList("This bag has four items")));
+		input.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList(dbBigger, "bigger")));
+
+		Tuple tupleInput = TupleFactory.getInstance().newTuple();
+		tupleInput.append(input);
+
+		Tuple out = o.exec(tupleInput);
+
+		// DataBags are ordered by size, so the bigger one will be the one
+		// containing 4 items
+		Assert.assertEquals("bigger", out.get(1));
+	}
+
+	@Test
+	public void testBiggerTuple() throws Exception {
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("1", "min");
+
+		DataBag input = BagFactory.getInstance().newDefaultBag();
+		Tuple tpSmaller = TupleFactory.getInstance().newTuple();
+		tpSmaller.append("This is a smaller tuple.");
+		tpSmaller.append("This is a smaller tuple.");
+		tpSmaller.append("This is a smaller tuple.");
+		input.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList(tpSmaller, "smaller")));
+
+		Tuple tpBigger = TupleFactory.getInstance().newTuple();
+		tpBigger.append("This is a bigger tuple.");
+		tpBigger.append("This is a bigger tuple.");
+		tpBigger.append("This is a bigger tuple.");
+		tpBigger.append("This is a bigger tuple.");
+		input.add(TupleFactory.getInstance().newTuple(
+				Arrays.asList(tpBigger, "bigger")));
+
+		Tuple tupleInput = TupleFactory.getInstance().newTuple();
+		tupleInput.append(input);
+
+		Tuple out = o.exec(tupleInput);
+
+		// DataBags are ordered by size, so the bigger one will be the one
+		// containing 4 items
+		Assert.assertEquals("smaller", out.get(1));
+	}
+
+	@Test
+	public void testMaxAccumulated() throws Exception {
+
+		ExtremalTupleByNthField o = new ExtremalTupleByNthField("5", "max");
+
+		for (int j = 0; j < 5; ++j) {
+			for (int i = 0; i < 100; i++) {
+				Tuple t = TupleFactory.getInstance().newTuple();
+				t.append(i + j);
+				t.append(" " + (i + j));
+				t.append(i + j);
+				t.append(i + j);
+				t.append(i + j);
+				o.accumulate(t);
+			}
+
+			Tuple out = o.getValue();
+			Assert.assertEquals(" " + (99 + j), (String) out.get(1));
+			o.cleanup();
+		}
+	}
+}