You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/22 19:41:16 UTC

svn commit: r1633662 - in /pig/branches/branch-0.14: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionen...

Author: rohini
Date: Wed Oct 22 17:41:16 2014
New Revision: 1633662

URL: http://svn.apache.org/r1633662
Log:
PIG-4039: New interface for resetting static variables for jvm reuse (rohini)

Added:
    pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java
    pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java
    pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java
Modified:
    pig/branches/branch-0.14/CHANGES.txt
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java
    pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java
    pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Wed Oct 22 17:41:16 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4039: New interface for resetting static variables for jvm reuse (rohini)
+
 PIG-3870: STRSPLITTOBAG UDF (cryptoe via daijy)
 
 PIG-4080: Add Preprocessor commands and more to the black/whitelisting feature (prkommireddi via daijy)

Added: pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java?rev=1633662&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java Wed Oct 22 17:41:16 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JVMReuseImpl {
+
+    private static Log LOG = LogFactory.getLog(JVMReuseImpl.class);
+
+    public void cleanupStaticData() {
+        List<Method> staticCleanupMethods = JVMReuseManager.getInstance()
+                .getStaticDataCleanupMethods();
+        for (Method m : staticCleanupMethods) {
+            try {
+                String msg = "Invoking method " + m.getName() + " in class "
+                        + m.getDeclaringClass().getName() + " for static data cleanup";
+                if (m.getDeclaringClass().getName().startsWith("org.apache.pig")) {
+                    LOG.debug(msg);
+                } else {
+                    LOG.info(msg);
+                }
+                m.invoke(null);
+            } catch (Exception e) {
+                throw new RuntimeException("Error while invoking method "
+                        + m.getName() + " in class " + m.getDeclaringClass().getName()
+                        + " for static data cleanup", e);
+            }
+        }
+    }
+}

Added: pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java?rev=1633662&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java Wed Oct 22 17:41:16 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+  * This class is used to manage JVM Reuse in case of execution engines like Tez
+ * and Spark.
+ *
+ * Static data members of a UDF, LoadFunc or StoreFunc class
+ * need to be reset or object references made null or reinitialized
+ * when the jvm container is reused for new tasks.
+ *
+ * Example usage to perform static data cleanup in a UDF as follows.
+ *
+ * public class MyUDF extends EvalFunc<Tuple> {
+ *   private static int numInvocations = 0;
+ *   private static Reporter reporter;
+ *
+ *   static {
+ *      // Register this class for static data cleanup
+ *      JVMReuseManager.getInstance().registerForStaticDataCleanup(MyUDF.class);
+ *   }
+ *
+ *   // Write a public static method that performs the cleanup
+ *   // and annotate it with @StaticDataCleanup
+ *   @StaticDataCleanup
+ *   public static void staticDataCleanup() {
+ *      numInvocations = 0;
+ *      reporter = null;
+ *   }
+ *   #### UDF Code goes here ######
+ * }
+ *
+ * @since Pig 0.14
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class JVMReuseManager {
+
+    private static Log LOG = LogFactory.getLog(JVMReuseImpl.class);
+    private List<Method> cleanupMethods = new ArrayList<Method>();
+    private static JVMReuseManager instance = new JVMReuseManager();
+
+    private JVMReuseManager() {
+    }
+
+    public static JVMReuseManager getInstance() {
+        return instance;
+    }
+
+    public void registerForStaticDataCleanup(Class<?> clazz) {
+        for (Method method : clazz.getMethods()) {
+            if (method.isAnnotationPresent(StaticDataCleanup.class)) {
+                if (!(Modifier.isStatic(method.getModifiers())
+                        && Modifier.isPublic(method.getModifiers()))) {
+                    throw new RuntimeException(
+                            "Method " + method.getName() + " in class " + clazz.getName() +
+                             "should be public and static as it is annotated with  ");
+                }
+                LOG.debug("Method " + method.getName() + " in class "
+                        + method.getDeclaringClass()
+                        + " registered for static data cleanup");
+                instance.cleanupMethods.add(method);
+            }
+        }
+    }
+
+    List<Method> getStaticDataCleanupMethods() {
+        return cleanupMethods;
+    }
+
+}

Added: pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java?rev=1633662&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java Wed Oct 22 17:41:16 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface StaticDataCleanup {
+
+}

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Oct 22 17:41:16 2014
@@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -73,6 +75,15 @@ public class PigCombiner {
         PigContext pigContext = null;
         private volatile boolean initialized = false;
 
+        static {
+            JVMReuseManager.getInstance().registerForStaticDataCleanup(Combine.class);
+        }
+
+        @StaticDataCleanup
+        public static void staticDataCleanup() {
+            firstTime = true;
+        }
+
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Wed Oct 22 17:41:16 2014
@@ -30,7 +30,9 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -100,6 +102,17 @@ public class PigGenericMapReduce {
 
     public static ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PigGenericMapReduce.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        sJobContext = null;
+        sJobConf = null;
+        sJobConfInternal = new ThreadLocal<Configuration>();
+    }
+
     public static class Map extends PigMapBase {
 
         @Override

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Oct 22 17:41:16 2014
@@ -56,17 +56,11 @@ public final class PigHadoopLogger imple
         return logger;
     }
 
-    public void destroy() {
-        if (reporter != null) {
-            reporter.destroy();
-        }
-        reporter = null;
-    }
-
     public void setReporter(PigStatusReporter reporter) {
         this.reporter = reporter;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public void warn(Object o, String msg, Enum warningEnum) {
         String className = o.getClass().getName();

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Oct 22 17:41:16 2014
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -100,7 +102,7 @@ public abstract class PhysicalOperator e
     // Will be used by operators to report status or transmit heartbeat
     // Should be set by the backends to appropriate implementations that
     // wrap their own version of a reporter.
-    public static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
+    protected static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
 
     // Will be used by operators to aggregate warning messages
     // Should be set by the backends to appropriate implementations that
@@ -120,6 +122,10 @@ public abstract class PhysicalOperator e
 
     private List<OriginalLocation> originalLocations =  new ArrayList<OriginalLocation>();
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PhysicalOperator.class);
+    }
+
     public PhysicalOperator(OperatorKey k) {
         this(k, -1, null);
     }
@@ -451,6 +457,11 @@ public abstract class PhysicalOperator e
         PhysicalOperator.reporter.set(reporter);
     }
 
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        reporter = new ThreadLocal<PigProgressable>();
+    }
+
     /**
      * Make a deep copy of this operator. This function is blank, however,
      * we should leave a place holder so that the subclasses can clone

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Wed Oct 22 17:41:16 2014
@@ -30,6 +30,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -38,7 +39,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -111,10 +111,7 @@ public class PigProcessor extends Abstra
         sampleMap = null;
 
         // Reset static variables cleared for avoiding OOM.
-        // TODO: Figure out a cleaner way to do this. ThreadLocals actually can be avoided all together
-        // for mapreduce/tez mode and just used for Local mode.
-        PhysicalOperator.reporter = new ThreadLocal<PigProgressable>();
-        PigMapReduce.sJobConfInternal = new ThreadLocal<Configuration>();
+        new JVMReuseImpl().cleanupStaticData();
 
         UserPayload payload = getContext().getUserPayload();
         conf = TezUtils.createConfFromUserPayload(payload);
@@ -158,24 +155,18 @@ public class PigProcessor extends Abstra
 
     @Override
     public void close() throws Exception {
-        // Avoid memory leak. ThreadLocals especially leak a lot of memory.
-        // The Reporter and Context objects hold TezProcessorContextImpl
-        // which holds input and its sort buffers which are huge.
-        PhysicalOperator.reporter = new ThreadLocal<PigProgressable>();
-        PigMapReduce.sJobConfInternal = new ThreadLocal<Configuration>();
-        PigMapReduce.sJobContext = null;
-        PigContext.setPackageImportList(null);
-        UDFContext.destroy();
+
         execPlan = null;
         fileOutputs = null;
         leaf = null;
         conf = null;
         sampleMap = null;
         sampleVertex = null;
-        if (pigHadoopLogger != null) {
-            pigHadoopLogger.destroy();
-            pigHadoopLogger = null;
-        }
+        pigHadoopLogger = null;
+        // Avoid memory leak. ThreadLocals especially leak a lot of memory.
+        // The Reporter and Context objects hold TezProcessorContextImpl
+        // which holds input and its sort buffers which are huge.
+        new JVMReuseImpl().cleanupStaticData();
     }
 
     @Override

Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java Wed Oct 22 17:41:16 2014
@@ -48,13 +48,13 @@ import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.Level;
 import org.apache.pig.ExecType;
 import org.apache.pig.ExecTypeProvider;
 import org.apache.pig.FuncSpec;
-import org.apache.pig.Main;
+import org.apache.pig.JVMReuseManager;
 import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.DataStorageException;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
@@ -65,7 +65,6 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.parameters.ParseException;
 import org.apache.pig.tools.parameters.PreprocessorContext;
@@ -172,6 +171,15 @@ public class PigContext implements Seria
     // List of paths skipped for automatic shipping
     List<String> skippedShipPaths = new ArrayList<String>();
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PigContext.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        packageImportList.set(null);
+    }
+
     /**
      * extends URLClassLoader to allow adding to classpath as new jars
      * are registered.

Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java Wed Oct 22 17:41:16 2014
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 
 public class UDFContext {
@@ -41,6 +43,10 @@ public class UDFContext {
         }
     };
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(UDFContext.class);
+    }
+
     private UDFContext() {
         udfConfs = new HashMap<UDFContextKey, Properties>();
     }
@@ -62,7 +68,8 @@ public class UDFContext {
     /*
      *  internal pig use only - should NOT be called from user code
      */
-    public static void destroy() {
+    @StaticDataCleanup
+    public static void cleanupStaticData() {
         tss = new ThreadLocal<UDFContext>() {
             @Override
             public UDFContext initialValue() {

Modified: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Wed Oct 22 17:41:16 2014
@@ -21,6 +21,8 @@ package org.apache.pig.tools.pigstats;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.util.Progressable;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.hadoop.executionengine.TaskContext;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -33,6 +35,15 @@ public class PigStatusReporter extends S
 
     private TaskContext<?> context = null;
 
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(PigStatusReporter.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        reporter = null;
+    }
+
     private PigStatusReporter() {
     }
 
@@ -46,10 +57,6 @@ public class PigStatusReporter extends S
         return reporter;
     }
 
-    public void destroy() {
-        context = null;
-    }
-
     public void setContext(TaskContext<?> context) {
         this.context = context;
     }