You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2010/08/24 23:11:16 UTC
svn commit: r988730 - in /hadoop/pig/trunk: ./ src/org/apache/pig/builtin/
test/org/apache/pig/test/
Author: dvryaboy
Date: Tue Aug 24 21:11:16 2010
New Revision: 988730
URL: http://svn.apache.org/viewvc?rev=988730&view=rev
Log:
PIG-1551 Improve dynamic invokers to deal with no-arg methods and array parameters
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java
hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java
hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java
hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java
hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java
hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java
hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java
hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Aug 24 21:11:16 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
IMPROVEMENTS
+PIG-1551: Improve dynamic invokers to deal with no-arg methods and array parameters (dvryaboy)
+
PIG-1311: Document audience and stability for remaining interfaces (gates)
PIG-506: Does pig need a NATIVE keyword? (aniket486 via thejas)
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/GenericInvoker.java Tue Aug 24 21:11:16 2010
@@ -32,9 +32,18 @@ import org.apache.pig.impl.logicalLayer.
* Class-specific non-generic extensions of this class are needed for Pig to know what type
* of return to expect from exec, and to find the appropriate classes through reflection.
* All they have to do is implement the constructors that call into super(). Note that the
- * no-parameter constructor is <b>required</b>, if nonsensical, for Pig to do its work.
+ * no-parameter constructor is <b>required</b>, if seemingly nonsensical, for Pig to do its work.
+ * <p>
+ * The Invoker family of udfs understand the following class names (all case-independent):
+ * <li>String
+ * <li>Long
+ * <li>Float
+ * <li>Double
+ * <li>Int
+ * <p>
+ * Invokers can also work with array arguments, represented in Pig as DataBags of single-tuple
+ * elements. Simply refer to <code>string[]</code>, for example.
* <p>
- *
* This UDF allows one to dynamically invoke Java methods that return a <code>T</code>
* <p>
* Usage of the Invoker family of UDFs (adjust as appropriate):
@@ -54,6 +63,7 @@ import org.apache.pig.impl.logicalLayer.
* The first argument to the constructor is the full path to desired method.<br>
* The second argument is a list of classes of the method parameters.<br>
* If the method is not static, the first element in this list is the object to invoke the method on.<br>
+ * The second argument is optional (a no-argument static method is assumed if it is not supplied).<br>
* The third argument is the keyword "static" (or "true") to signify that the method is static. <br>
* The third argument is optional, and true by default.<br>
* <p>
@@ -65,6 +75,11 @@ public abstract class GenericInvoker<T>
public GenericInvoker() {}
+ public GenericInvoker(String fullName)
+ throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
+ invoker_ = new Invoker<T>(fullName, "");
+ }
+
public GenericInvoker(String fullName, String paramSpecsStr)
throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
invoker_ = new Invoker<T>(fullName, paramSpecsStr);
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForDouble.java Tue Aug 24 21:11:16 2010
@@ -26,6 +26,10 @@ public class InvokeForDouble extends Gen
public InvokeForDouble() {}
+ public InvokeForDouble(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
+ super(fullName);
+ }
+
public InvokeForDouble(String fullName, String paramSpecsStr) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
super(fullName, paramSpecsStr);
}
@@ -34,4 +38,6 @@ public class InvokeForDouble extends Gen
throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
super(fullName, paramSpecsStr, isStatic);
}
+
+
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForFloat.java Tue Aug 24 21:11:16 2010
@@ -27,6 +27,10 @@ public class InvokeForFloat extends Gene
public InvokeForFloat() {}
+ public InvokeForFloat(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
+ super(fullName);
+ }
+
public InvokeForFloat(String fullName, String paramSpecsStr) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
super(fullName, paramSpecsStr);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForInt.java Tue Aug 24 21:11:16 2010
@@ -30,6 +30,10 @@ public class InvokeForInt extends Generi
super(fullName, paramSpecsStr);
}
+ public InvokeForInt(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
+ super(fullName);
+ }
+
public InvokeForInt(String fullName, String paramSpecsStr, String isStatic)
throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
super(fullName, paramSpecsStr, isStatic);
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForLong.java Tue Aug 24 21:11:16 2010
@@ -26,6 +26,10 @@ public class InvokeForLong extends Gener
public InvokeForLong() {}
+ public InvokeForLong(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
+ super(fullName);
+ }
+
public InvokeForLong(String fullName, String paramSpecsStr) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
super(fullName, paramSpecsStr);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/InvokeForString.java Tue Aug 24 21:11:16 2010
@@ -31,6 +31,10 @@ public class InvokeForString extends Gen
super(fullName, paramSpecsStr);
}
+ public InvokeForString(String fullName) throws FrontendException, SecurityException, ClassNotFoundException, NoSuchMethodException {
+ super(fullName);
+ }
+
public InvokeForString(String fullName, String paramSpecsStr, String isStatic)
throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
super(fullName, paramSpecsStr, isStatic);
Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/Invoker.java Tue Aug 24 21:11:16 2010
@@ -23,13 +23,34 @@ import java.lang.reflect.InvocationTarge
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
public class Invoker<T> {
+ private static final Log LOG = LogFactory.getLog(Invoker.class);
+
+ private static final Class<?> DOUBLE_ARRAY_CLASS = new double[0].getClass();
+ private static final Class<?> INT_ARRAY_CLASS = new int[0].getClass();
+ private static final Class<?> FLOAT_ARRAY_CLASS = new float[0].getClass();
+ private static final Class<?> STRING_ARRAY_CLASS = new String[0].getClass();
+ private static final Class<?> LONG_ARRAY_CLASS = new long[0].getClass();
+
+ @SuppressWarnings("unchecked")
+ private static final Set<Class<?>> ARRAY_CLASSES = Sets.newHashSet(
+ DOUBLE_ARRAY_CLASS, INT_ARRAY_CLASS, FLOAT_ARRAY_CLASS, STRING_ARRAY_CLASS,
+ LONG_ARRAY_CLASS);
+
private Method method_;
private Class<?>[] paramClasses_;
@@ -37,15 +58,17 @@ public class Invoker<T> {
private Class<?> selfClass_;
private Type returnType_;
- public Invoker(String fullName, String paramSpecsStr) throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
+ public Invoker(String fullName, String paramSpecsStr)
+ throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
this(fullName, paramSpecsStr, "true");
}
- public Invoker(String fullName, String paramSpecsStr, String isStatic) throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
+ public Invoker(String fullName, String paramSpecsStr, String isStatic)
+ throws ClassNotFoundException, FrontendException, SecurityException, NoSuchMethodException {
String className = fullName.substring(0, fullName.lastIndexOf('.'));
String methodName = fullName.substring(fullName.lastIndexOf('.')+1);
Class<?> klazz = Class.forName(className);
- String[] paramSpecs = paramSpecsStr.split(" ");
+ String[] paramSpecs = "".equals(paramSpecsStr) ? new String[0] : paramSpecsStr.split(" ");
isStatic_ = "static".equalsIgnoreCase(isStatic) || "true".equals(isStatic);
paramClasses_ = new Class<?>[paramSpecs.length];
for (int i = 0; i < paramSpecs.length; i++) {
@@ -58,8 +81,9 @@ public class Invoker<T> {
returnType_ = method_.getGenericReturnType();
}
+ @SuppressWarnings("rawtypes")
public Type getReturnType() {
- return returnType_;
+ return unPrimitivize((Class) returnType_);
}
private static Class<?>[] dropFirstClass(Class<?>[] original) {
@@ -89,8 +113,18 @@ public class Invoker<T> {
return Float.TYPE;
} else if ("long".equalsIgnoreCase(klass)) {
return Long.TYPE;
+ } else if ("double[]".equalsIgnoreCase(klass)) {
+ return DOUBLE_ARRAY_CLASS;
+ } else if ("int[]".equalsIgnoreCase(klass)) {
+ return INT_ARRAY_CLASS;
+ } else if ("long[]".equalsIgnoreCase(klass)) {
+ return LONG_ARRAY_CLASS;
+ } else if ("float[]".equalsIgnoreCase(klass)) {
+ return FLOAT_ARRAY_CLASS;
+ } else if ("string[]".equalsIgnoreCase(klass)) {
+ return STRING_ARRAY_CLASS;
} else {
- throw new FrontendException("unable to find mathing class for " + klass);
+ throw new FrontendException("unable to find matching class for " + klass);
}
}
@@ -105,20 +139,80 @@ public class Invoker<T> {
} else if (klass.equals(Double.TYPE)) {
return Double.class;
} else {
- return klass;
+ return klass;
+ }
+ }
+
+ private static <T> T convertToExpectedArg(Class<T> klass, Object obj) throws ExecException {
+ if (ARRAY_CLASSES.contains(klass)) {
+ DataBag dbag = (DataBag) obj;
+ if (STRING_ARRAY_CLASS.equals(klass)) {
+ List<String> dataList = Lists.newArrayList();
+ for (Tuple t : dbag) {
+ dataList.add( (String) t.get(0));
+ }
+ String[] dataArray = new String[dataList.size()];
+ for (int i = 0; i < dataList.size(); i++) {
+ dataArray[i] = dataList.get(i);
+ }
+ obj = dataArray;
+ } else {
+ List<Number> dataList = bagToNumberList(dbag);
+ if (DOUBLE_ARRAY_CLASS.equals(klass)) {
+ double[] dataArray = new double[dataList.size()];
+ for (int i = 0; i < dataList.size(); i++) {
+ dataArray[i] = dataList.get(i).doubleValue();
+ }
+ obj = dataArray;
+ } else if (INT_ARRAY_CLASS.equals(klass)) {
+ int[] dataArray = new int[dataList.size()];
+ for (int i = 0; i < dataList.size(); i++) {
+ dataArray[i] = dataList.get(i).intValue();
+ }
+ obj = dataArray;
+ } else if (FLOAT_ARRAY_CLASS.equals(klass)) {
+ float[] dataArray = new float[dataList.size()];
+ for (int i = 0; i < dataList.size(); i++) {
+ dataArray[i] = dataList.get(i).floatValue();
+ }
+ obj = dataArray;
+ } else if (LONG_ARRAY_CLASS.equals(klass)) {
+ long[] dataArray = new long[dataList.size()];
+ for (int i = 0; i < dataList.size(); i++) {
+ dataArray[i] = dataList.get(i).longValue();
+ }
+ obj = dataArray;
+ }
}
+ }
+ try {
+ return klass.cast(obj);
+ } catch (ClassCastException e) {
+ LOG.error("Error in dynamic argument processing. Casting to: "
+ + klass + " from: " + obj.getClass(), e);
+ throw new ExecException(e);
+ }
+ }
+
+ private static List<Number> bagToNumberList(DataBag dbag) throws ExecException {
+ List<Number> dataList = Lists.newArrayList();
+ for (Tuple t : dbag) {
+ dataList.add( (Number) t.get(0));
+ }
+ return dataList;
}
private Object[] tupleToArgs(Tuple t) throws ExecException {
- if ( (t == null && paramClasses_ != null) || (t != null && t.size() != paramClasses_.length)) {
+ if ( (t == null && (paramClasses_ != null || paramClasses_.length != 0))
+ || (t != null && t.size() < paramClasses_.length)) {
throw new ExecException("unable to match function arguments to declared signature.");
}
if (t == null) {
return null;
}
- Object[] args = new Object[t.size()];
- for (int i = 0; i < t.size(); i++) {
- args[i] = unPrimitivize(paramClasses_[i]).cast(t.get(i));
+ Object[] args = new Object[paramClasses_.length];
+ for (int i = 0; i < paramClasses_.length; i++) {
+ args[i] = convertToExpectedArg(unPrimitivize(paramClasses_[i]), t.get(i));
}
return args;
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java?rev=988730&r1=988729&r2=988730&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestInvoker.java Tue Aug 24 21:11:16 2010
@@ -29,6 +29,8 @@ import org.apache.pig.builtin.InvokeForF
import org.apache.pig.builtin.InvokeForInt;
import org.apache.pig.builtin.InvokeForLong;
import org.apache.pig.builtin.InvokeForString;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.junit.Test;
@@ -40,6 +42,8 @@ import org.junit.Test;
public class TestInvoker {
private final TupleFactory tf_ = TupleFactory.getInstance();
+ private final BagFactory bf_ = BagFactory.getInstance();
+
@Test
public void testStringInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, IOException {
@@ -67,6 +71,11 @@ public class TestInvoker {
}
@Test
+ public void testNoArgInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, IOException {
+ InvokeForInt id = new InvokeForInt(TestInvoker.class.getName() + ".simpleStaticFunction");
+ assertEquals(Integer.valueOf(1), id.exec(tf_.newTuple()));
+ }
+ @Test
public void testLongInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, NumberFormatException, IOException {
InvokeForLong il = new InvokeForLong("java.lang.Long.valueOf", "String");
Tuple t = tf_.newTuple(1);
@@ -85,6 +94,21 @@ public class TestInvoker {
}
@Test
+ public void testArrayConversion() throws SecurityException, ClassNotFoundException, NoSuchMethodException, IOException {
+ InvokeForInt ii = new InvokeForInt(TestInvoker.class.getName() + ".avg", "double[]");
+ DataBag nums = newSimpleBag(1.0, 2.0, 3.0);
+ assertEquals(Integer.valueOf(2), ii.exec(tf_.newTuple(nums)));
+
+ ii = new InvokeForInt(TestInvoker.class.getName() + ".avg", "long[]");
+ nums = newSimpleBag(1L, 2L, 3L);
+ assertEquals(Integer.valueOf(2), ii.exec(tf_.newTuple(nums)));
+
+ InvokeForString is = new InvokeForString(TestInvoker.class.getName() + ".concatStringArray", "string[]");
+ DataBag strings = newSimpleBag("foo", "bar", "baz");
+ assertEquals("foobarbaz", is.exec(tf_.newTuple(strings)));
+ }
+
+ @Test
public void testDoubleInvoker() throws SecurityException, ClassNotFoundException, NoSuchMethodException, NumberFormatException, IOException {
InvokeForDouble il = new InvokeForDouble("java.lang.Double.valueOf", "String");
Tuple t = tf_.newTuple(1);
@@ -106,6 +130,42 @@ public class TestInvoker {
return str1.concat(str2);
}
+ public static String concatStringArray(String[] strings) {
+ StringBuilder sb = new StringBuilder();
+ for (String s : strings) {
+ sb.append(s);
+ }
+ return sb.toString();
+ }
+
+ public static int simpleStaticFunction() {
+ return 1;
+ }
+
+ public static int avg(long[] nums) {
+ long sum = 0;
+ for (long d: nums) {
+ sum += d;
+ }
+ return (int) sum/nums.length;
+ }
+
+ public static int avg(double[] nums) {
+ double sum = 0;
+ for (double d: nums) {
+ sum += d;
+ }
+ return (int) sum/nums.length;
+ }
+
+ private DataBag newSimpleBag(Object... objects) {
+ DataBag bag = bf_.newDefaultBag();
+ for (Object o : objects) {
+ bag.add(tf_.newTuple(o));
+ }
+ return bag;
+ }
+
@Test
public void testSpeed() throws IOException, SecurityException, ClassNotFoundException, NoSuchMethodException {
EvalFunc<Double> log = new Log();