You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/10/22 19:41:16 UTC
svn commit: r1633662 - in /pig/branches/branch-0.14: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionen...
Author: rohini
Date: Wed Oct 22 17:41:16 2014
New Revision: 1633662
URL: http://svn.apache.org/r1633662
Log:
PIG-4039: New interface for resetting static variables for jvm reuse (rohini)
Added:
pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java
pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java
pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java
Modified:
pig/branches/branch-0.14/CHANGES.txt
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java
pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java
pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Wed Oct 22 17:41:16 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4039: New interface for resetting static variables for jvm reuse (rohini)
+
PIG-3870: STRSPLITTOBAG UDF (cryptoe via daijy)
PIG-4080: Add Preprocessor commands and more to the black/whitelisting feature (prkommireddi via daijy)
Added: pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java?rev=1633662&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/JVMReuseImpl.java Wed Oct 22 17:41:16 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.lang.reflect.Method;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class JVMReuseImpl {
+
+ private static Log LOG = LogFactory.getLog(JVMReuseImpl.class);
+
+ public void cleanupStaticData() {
+ List<Method> staticCleanupMethods = JVMReuseManager.getInstance()
+ .getStaticDataCleanupMethods();
+ for (Method m : staticCleanupMethods) {
+ try {
+ String msg = "Invoking method " + m.getName() + " in class "
+ + m.getDeclaringClass().getName() + " for static data cleanup";
+ if (m.getDeclaringClass().getName().startsWith("org.apache.pig")) {
+ LOG.debug(msg);
+ } else {
+ LOG.info(msg);
+ }
+ m.invoke(null);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while invoking method "
+ + m.getName() + " in class " + m.getDeclaringClass().getName()
+ + " for static data cleanup", e);
+ }
+ }
+ }
+}
Added: pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java?rev=1633662&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/JVMReuseManager.java Wed Oct 22 17:41:16 2014
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+
+/**
+ * This class is used to manage JVM Reuse in case of execution engines like Tez
+ * and Spark.
+ *
+ * Static data members of a UDF, LoadFunc or StoreFunc class
+ * need to be reset or object references made null or reinitialized
+ * when the jvm container is reused for new tasks.
+ *
+ * Example usage to perform static data cleanup in a UDF as follows.
+ *
+ * public class MyUDF extends EvalFunc<Tuple> {
+ * private static int numInvocations = 0;
+ * private static Reporter reporter;
+ *
+ * static {
+ * // Register this class for static data cleanup
+ * JVMReuseManager.getInstance().registerForStaticDataCleanup(MyUDF.class);
+ * }
+ *
+ * // Write a public static method that performs the cleanup
+ * // and annotate it with @StaticDataCleanup
+ * @StaticDataCleanup
+ * public static void staticDataCleanup() {
+ * numInvocations = 0;
+ * reporter = null;
+ * }
+ * #### UDF Code goes here ######
+ * }
+ *
+ * @since Pig 0.14
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class JVMReuseManager {
+
+ private static Log LOG = LogFactory.getLog(JVMReuseImpl.class);
+ private List<Method> cleanupMethods = new ArrayList<Method>();
+ private static JVMReuseManager instance = new JVMReuseManager();
+
+ private JVMReuseManager() {
+ }
+
+ public static JVMReuseManager getInstance() {
+ return instance;
+ }
+
+ public void registerForStaticDataCleanup(Class<?> clazz) {
+ for (Method method : clazz.getMethods()) {
+ if (method.isAnnotationPresent(StaticDataCleanup.class)) {
+ if (!(Modifier.isStatic(method.getModifiers())
+ && Modifier.isPublic(method.getModifiers()))) {
+ throw new RuntimeException(
+ "Method " + method.getName() + " in class " + clazz.getName() +
+ "should be public and static as it is annotated with ");
+ }
+ LOG.debug("Method " + method.getName() + " in class "
+ + method.getDeclaringClass()
+ + " registered for static data cleanup");
+ instance.cleanupMethods.add(method);
+ }
+ }
+ }
+
+ List<Method> getStaticDataCleanupMethods() {
+ return cleanupMethods;
+ }
+
+}
Added: pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java?rev=1633662&view=auto
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java (added)
+++ pig/branches/branch-0.14/src/org/apache/pig/StaticDataCleanup.java Wed Oct 22 17:41:16 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.METHOD)
+public @interface StaticDataCleanup {
+
+}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Wed Oct 22 17:41:16 2014
@@ -27,7 +27,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.pig.JVMReuseManager;
import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -73,6 +75,15 @@ public class PigCombiner {
PigContext pigContext = null;
private volatile boolean initialized = false;
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(Combine.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ firstTime = true;
+ }
+
/**
* Configures the Reduce plan, the POPackage operator
* and the reporter thread
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Wed Oct 22 17:41:16 2014
@@ -30,7 +30,9 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.pig.JVMReuseManager;
import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -100,6 +102,17 @@ public class PigGenericMapReduce {
public static ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PigGenericMapReduce.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ sJobContext = null;
+ sJobConf = null;
+ sJobConfInternal = new ThreadLocal<Configuration>();
+ }
+
public static class Map extends PigMapBase {
@Override
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigHadoopLogger.java Wed Oct 22 17:41:16 2014
@@ -56,17 +56,11 @@ public final class PigHadoopLogger imple
return logger;
}
- public void destroy() {
- if (reporter != null) {
- reporter.destroy();
- }
- reporter = null;
- }
-
public void setReporter(PigStatusReporter reporter) {
this.reporter = reporter;
}
+ @Override
@SuppressWarnings("rawtypes")
public void warn(Object o, String msg, Enum warningEnum) {
String className = o.getClass().getName();
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Wed Oct 22 17:41:16 2014
@@ -24,6 +24,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -100,7 +102,7 @@ public abstract class PhysicalOperator e
// Will be used by operators to report status or transmit heartbeat
// Should be set by the backends to appropriate implementations that
// wrap their own version of a reporter.
- public static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
+ protected static ThreadLocal<PigProgressable> reporter = new ThreadLocal<PigProgressable>();
// Will be used by operators to aggregate warning messages
// Should be set by the backends to appropriate implementations that
@@ -120,6 +122,10 @@ public abstract class PhysicalOperator e
private List<OriginalLocation> originalLocations = new ArrayList<OriginalLocation>();
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PhysicalOperator.class);
+ }
+
public PhysicalOperator(OperatorKey k) {
this(k, -1, null);
}
@@ -451,6 +457,11 @@ public abstract class PhysicalOperator e
PhysicalOperator.reporter.set(reporter);
}
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ reporter = new ThreadLocal<PigProgressable>();
+ }
+
/**
* Make a deep copy of this operator. This function is blank, however,
* we should leave a place holder so that the subclasses can clone
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Wed Oct 22 17:41:16 2014
@@ -30,6 +30,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.JVMReuseImpl;
import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -38,7 +39,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -111,10 +111,7 @@ public class PigProcessor extends Abstra
sampleMap = null;
// Reset static variables cleared for avoiding OOM.
- // TODO: Figure out a cleaner way to do this. ThreadLocals actually can be avoided all together
- // for mapreduce/tez mode and just used for Local mode.
- PhysicalOperator.reporter = new ThreadLocal<PigProgressable>();
- PigMapReduce.sJobConfInternal = new ThreadLocal<Configuration>();
+ new JVMReuseImpl().cleanupStaticData();
UserPayload payload = getContext().getUserPayload();
conf = TezUtils.createConfFromUserPayload(payload);
@@ -158,24 +155,18 @@ public class PigProcessor extends Abstra
@Override
public void close() throws Exception {
- // Avoid memory leak. ThreadLocals especially leak a lot of memory.
- // The Reporter and Context objects hold TezProcessorContextImpl
- // which holds input and its sort buffers which are huge.
- PhysicalOperator.reporter = new ThreadLocal<PigProgressable>();
- PigMapReduce.sJobConfInternal = new ThreadLocal<Configuration>();
- PigMapReduce.sJobContext = null;
- PigContext.setPackageImportList(null);
- UDFContext.destroy();
+
execPlan = null;
fileOutputs = null;
leaf = null;
conf = null;
sampleMap = null;
sampleVertex = null;
- if (pigHadoopLogger != null) {
- pigHadoopLogger.destroy();
- pigHadoopLogger = null;
- }
+ pigHadoopLogger = null;
+ // Avoid memory leak. ThreadLocals especially leak a lot of memory.
+ // The Reporter and Context objects hold TezProcessorContextImpl
+ // which holds input and its sort buffers which are huge.
+ new JVMReuseImpl().cleanupStaticData();
}
@Override
Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/PigContext.java Wed Oct 22 17:41:16 2014
@@ -48,13 +48,13 @@ import org.antlr.runtime.tree.Tree;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Level;
import org.apache.pig.ExecType;
import org.apache.pig.ExecTypeProvider;
import org.apache.pig.FuncSpec;
-import org.apache.pig.Main;
+import org.apache.pig.JVMReuseManager;
import org.apache.pig.PigException;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
@@ -65,7 +65,6 @@ import org.apache.pig.backend.hadoop.dat
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.impl.util.JarManager;
import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
import org.apache.pig.tools.parameters.ParseException;
import org.apache.pig.tools.parameters.PreprocessorContext;
@@ -172,6 +171,15 @@ public class PigContext implements Seria
// List of paths skipped for automatic shipping
List<String> skippedShipPaths = new ArrayList<String>();
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PigContext.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ packageImportList.set(null);
+ }
+
/**
* extends URLClassLoader to allow adding to classpath as new jars
* are registered.
Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/util/UDFContext.java Wed Oct 22 17:41:16 2014
@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
public class UDFContext {
@@ -41,6 +43,10 @@ public class UDFContext {
}
};
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(UDFContext.class);
+ }
+
private UDFContext() {
udfConfs = new HashMap<UDFContextKey, Properties>();
}
@@ -62,7 +68,8 @@ public class UDFContext {
/*
* internal pig use only - should NOT be called from user code
*/
- public static void destroy() {
+ @StaticDataCleanup
+ public static void cleanupStaticData() {
tss = new ThreadLocal<UDFContext>() {
@Override
public UDFContext initialValue() {
Modified: pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1633662&r1=1633661&r2=1633662&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Wed Oct 22 17:41:16 2014
@@ -21,6 +21,8 @@ package org.apache.pig.tools.pigstats;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.util.Progressable;
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
import org.apache.pig.backend.hadoop.executionengine.TaskContext;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
@@ -33,6 +35,15 @@ public class PigStatusReporter extends S
private TaskContext<?> context = null;
+ static {
+ JVMReuseManager.getInstance().registerForStaticDataCleanup(PigStatusReporter.class);
+ }
+
+ @StaticDataCleanup
+ public static void staticDataCleanup() {
+ reporter = null;
+ }
+
private PigStatusReporter() {
}
@@ -46,10 +57,6 @@ public class PigStatusReporter extends S
return reporter;
}
- public void destroy() {
- context = null;
- }
-
public void setContext(TaskContext<?> context) {
this.context = context;
}