You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/21 19:35:49 UTC
svn commit: r1588935 - in /pig/branches/tez/src/org/apache/pig:
backend/hadoop/executionengine/tez/PigProcessor.java
backend/hadoop/executionengine/tez/PigTezLogger.java
data/DefaultAbstractBag.java
Author: rohini
Date: Mon Apr 21 17:35:49 2014
New Revision: 1588935
URL: http://svn.apache.org/r1588935
Log:
PIG-3899: Fix memory leak with PigTezLogger (rohini)
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1588935&r1=1588934&r2=1588935&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Apr 21 17:35:49 2014
@@ -67,6 +67,7 @@ public class PigProcessor implements Log
private PhysicalOperator leaf;
private Configuration conf;
+ private PigTezLogger pigTezLogger;
public static String sampleVertex;
public static Map<String, Object> sampleMap;
@@ -102,8 +103,7 @@ public class PigProcessor implements Log
boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
- PigTezLogger pigTezLogger = new PigTezLogger(new TezStatusReporter(processorContext), aggregateWarning);
-
+ pigTezLogger = new PigTezLogger(new TezStatusReporter(processorContext), aggregateWarning);
PhysicalOperator.setPigLogger(pigTezLogger);
LinkedList<TezTaskConfigurable> tezTCs = PlanHelper.getPhysicalOperators(execPlan, TezTaskConfigurable.class);
@@ -121,6 +121,8 @@ public class PigProcessor implements Log
@Override
public void close() throws Exception {
// Avoid memory leak. ThreadLocals especially leak a lot of memory.
+ // The Reporter and Context objects hold TezProcessorContextImpl
+ // which holds input and its sort buffers which are huge.
PhysicalOperator.reporter = new ThreadLocal<PigProgressable>();
PigMapReduce.sJobConfInternal = new ThreadLocal<Configuration>();
PigMapReduce.sJobContext = null;
@@ -130,9 +132,12 @@ public class PigProcessor implements Log
conf = null;
sampleMap = null;
sampleVertex = null;
+ if (pigTezLogger != null) {
+ pigTezLogger.destroy();
+ pigTezLogger = null;
+ }
}
- @SuppressWarnings("rawtypes")
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
@@ -256,7 +261,7 @@ public class PigProcessor implements Log
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({ "unchecked" })
private void collectSample(String sampleVertex, LogicalInput logicalInput) throws Exception {
Boolean cached = (Boolean) ObjectCache.getInstance().retrieve("cached.sample." + sampleVertex);
if (cached == Boolean.TRUE) {
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java?rev=1588935&r1=1588934&r2=1588935&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java Mon Apr 21 17:35:49 2014
@@ -42,6 +42,7 @@ public class PigTezLogger implements Pig
this.aggregate = aggregate;
}
+ @Override
@SuppressWarnings("rawtypes")
public void warn(Object o, String msg, Enum warningEnum) {
String className = o.getClass().getName();
@@ -75,4 +76,8 @@ public class PigTezLogger implements Pig
return aggregate;
}
+ public void destroy() {
+ this.reporter = null;
+ }
+
}
\ No newline at end of file
Modified: pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1588935&r1=1588934&r2=1588935&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java Mon Apr 21 17:35:49 2014
@@ -51,7 +51,7 @@ public abstract class DefaultAbstractBag
// If we grow past 100K, may be worthwhile to register.
private static final int SPILL_REGISTER_THRESHOLD = 100 * 1024;
- private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+ private static PigLogger pigLogger;
private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
// Container that holds the tuples. Actual object instantiated by
@@ -85,7 +85,7 @@ public abstract class DefaultAbstractBag
/**
- * Sample every SPILL_SAMPLE_FREQUENCYth tuple
+ * Sample every SPILL_SAMPLE_FREQUENCYth tuple
* until we reach a max of SPILL_SAMPLE_SIZE
* to get an estimate of the tuple sizes.
*/