You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/08/13 02:14:00 UTC

svn commit: r1157299 - in /pig/branches/branch-0.9: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java

Author: dvryaboy
Date: Sat Aug 13 00:14:00 2011
New Revision: 1157299

URL: http://svn.apache.org/viewvc?rev=1157299&view=rev
Log:
PIG-2102: MonitoredUDF does not work

Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1157299&r1=1157298&r2=1157299&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Sat Aug 13 00:14:00 2011
@@ -194,6 +194,8 @@ PIG-1696: Performance: Use System.arrayc
 
 BUG FIXES
 
+PIG-2102: MonitoredUDF does not work (dvryaboy)
+
 PIG-2183: Pig not working with Hadoop 0.20.203.0 (daijy)
 
 PIG-2199: Penny throws Exception when netty classes are missing (ddaniels888 via daijy)

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java?rev=1157299&r1=1157298&r2=1157299&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/MonitoredUDFExecutor.java Sat Aug 13 00:14:00 2011
@@ -19,6 +19,7 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.util;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Type;
@@ -30,7 +31,6 @@ import java.util.concurrent.TimeoutExcep
 
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.pig.EvalFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.builtin.MonitoredUDF;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -48,20 +48,20 @@ import com.google.common.util.concurrent
  * handling errors and default values.
  * 
  */
-public class MonitoredUDFExecutor {
+public class MonitoredUDFExecutor implements Serializable {
 
-    private final ExecutorService exec;
-    private final TimeUnit timeUnit;
-    private final long duration;
-    private final Object defaultValue;
+    private final transient ExecutorService exec;
+    private final transient TimeUnit timeUnit;
+    private final transient long duration;
+    private final transient Object defaultValue;
     @SuppressWarnings("unchecked")
-    private final EvalFunc evalFunc;
-    private final Function<Tuple, Object> closure;
+    private final transient EvalFunc evalFunc;
+    private final transient Function<Tuple, Object> closure;
 
     // Let us reflect upon our errors.
-    private final Class<? extends ErrorCallback> errorCallback;
-    private final Method errorHandler;
-    private final Method timeoutHandler;
+    private final transient Class<? extends ErrorCallback> errorCallback;
+    private final transient Method errorHandler;
+    private final transient Method timeoutHandler;
 
     @SuppressWarnings("unchecked")
     public MonitoredUDFExecutor(EvalFunc udf) {
@@ -89,6 +89,7 @@ public class MonitoredUDFExecutor {
         Type retType = udf.getReturnType();
         defaultValue = getDefaultValue(anno, retType);
         closure = new Function<Tuple, Object>() {
+            @Override
             public Object apply(Tuple input) {
                 try {
                     return evalFunc.exec(input);
@@ -168,6 +169,7 @@ public class MonitoredUDFExecutor {
                     // CancellationException, ExecutionException  and InterruptedException here
                     // and do something special for UDF IOExceptions as opposed to thread exceptions.
                     new Function<Exception, Exception>() { 
+                        @Override
                         public Exception apply(Exception e) { 
                             return e; 
                         }