You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/08/13 02:14:00 UTC
svn commit: r1157299 - in /pig/branches/branch-0.9: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
Author: dvryaboy
Date: Sat Aug 13 00:14:00 2011
New Revision: 1157299
URL: http://svn.apache.org/viewvc?rev=1157299&view=rev
Log:
PIG-2102: MonitoredUDF does not work
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1157299&r1=1157298&r2=1157299&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Sat Aug 13 00:14:00 2011
@@ -194,6 +194,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-2102: MonitoredUDF does not work (dvryaboy)
+
PIG-2183: Pig not working with Hadoop 0.20.203.0 (daijy)
PIG-2199: Penny throws Exception when netty classes are missing (ddaniels888 via daijy)
Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java?rev=1157299&r1=1157298&r2=1157299&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java Sat Aug 13 00:14:00 2011
@@ -19,6 +19,7 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
import java.io.IOException;
+import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
@@ -30,7 +31,6 @@ import java.util.concurrent.TimeoutExcep
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.pig.EvalFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
import org.apache.pig.builtin.MonitoredUDF;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -48,20 +48,20 @@ import com.google.common.util.concurrent
* handling errors and default values.
*
*/
-public class MonitoredUDFExecutor {
+public class MonitoredUDFExecutor implements Serializable {
- private final ExecutorService exec;
- private final TimeUnit timeUnit;
- private final long duration;
- private final Object defaultValue;
+ private final transient ExecutorService exec;
+ private final transient TimeUnit timeUnit;
+ private final transient long duration;
+ private final transient Object defaultValue;
@SuppressWarnings("unchecked")
- private final EvalFunc evalFunc;
- private final Function<Tuple, Object> closure;
+ private final transient EvalFunc evalFunc;
+ private final transient Function<Tuple, Object> closure;
// Let us reflect upon our errors.
- private final Class<? extends ErrorCallback> errorCallback;
- private final Method errorHandler;
- private final Method timeoutHandler;
+ private final transient Class<? extends ErrorCallback> errorCallback;
+ private final transient Method errorHandler;
+ private final transient Method timeoutHandler;
@SuppressWarnings("unchecked")
public MonitoredUDFExecutor(EvalFunc udf) {
@@ -89,6 +89,7 @@ public class MonitoredUDFExecutor {
Type retType = udf.getReturnType();
defaultValue = getDefaultValue(anno, retType);
closure = new Function<Tuple, Object>() {
+ @Override
public Object apply(Tuple input) {
try {
return evalFunc.exec(input);
@@ -168,6 +169,7 @@ public class MonitoredUDFExecutor {
// CancellationException, ExecutionException and InterruptedException here
// and do something special for UDF IOExceptions as opposed to thread exceptions.
new Function<Exception, Exception>() {
+ @Override
public Exception apply(Exception e) {
return e;
}