You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/21 19:35:49 UTC

svn commit: r1588935 - in /pig/branches/tez/src/org/apache/pig: backend/hadoop/executionengine/tez/PigProcessor.java backend/hadoop/executionengine/tez/PigTezLogger.java data/DefaultAbstractBag.java

Author: rohini
Date: Mon Apr 21 17:35:49 2014
New Revision: 1588935

URL: http://svn.apache.org/r1588935
Log:
PIG-3899: Fix memory leak with PigTezLogger (rohini)

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
    pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1588935&r1=1588934&r2=1588935&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Apr 21 17:35:49 2014
@@ -67,6 +67,7 @@ public class PigProcessor implements Log
     private PhysicalOperator leaf;
 
     private Configuration conf;
+    private PigTezLogger pigTezLogger;
 
     public static String sampleVertex;
     public static Map<String, Object> sampleMap;
@@ -102,8 +103,7 @@ public class PigProcessor implements Log
 
         boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
 
-        PigTezLogger pigTezLogger = new PigTezLogger(new TezStatusReporter(processorContext), aggregateWarning);
-
+        pigTezLogger = new PigTezLogger(new TezStatusReporter(processorContext), aggregateWarning);
         PhysicalOperator.setPigLogger(pigTezLogger);
 
         LinkedList<TezTaskConfigurable> tezTCs = PlanHelper.getPhysicalOperators(execPlan, TezTaskConfigurable.class);
@@ -121,6 +121,8 @@ public class PigProcessor implements Log
     @Override
     public void close() throws Exception {
         // Avoid memory leak. ThreadLocals especially leak a lot of memory.
+        // The Reporter and Context objects hold TezProcessorContextImpl
+        // which holds input and its sort buffers which are huge.
         PhysicalOperator.reporter = new ThreadLocal<PigProgressable>();
         PigMapReduce.sJobConfInternal = new ThreadLocal<Configuration>();
         PigMapReduce.sJobContext = null;
@@ -130,9 +132,12 @@ public class PigProcessor implements Log
         conf = null;
         sampleMap = null;
         sampleVertex = null;
+        if (pigTezLogger != null) {
+            pigTezLogger.destroy();
+            pigTezLogger = null;
+        }
     }
 
-    @SuppressWarnings("rawtypes")
     @Override
     public void run(Map<String, LogicalInput> inputs,
             Map<String, LogicalOutput> outputs) throws Exception {
@@ -256,7 +261,7 @@ public class PigProcessor implements Log
         }
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings({ "unchecked" })
     private void collectSample(String sampleVertex, LogicalInput logicalInput) throws Exception {
         Boolean cached = (Boolean) ObjectCache.getInstance().retrieve("cached.sample." + sampleVertex);
         if (cached == Boolean.TRUE) {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java?rev=1588935&r1=1588934&r2=1588935&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigTezLogger.java Mon Apr 21 17:35:49 2014
@@ -42,6 +42,7 @@ public class PigTezLogger implements Pig
         this.aggregate = aggregate;
     }
 
+    @Override
     @SuppressWarnings("rawtypes")
     public void warn(Object o, String msg, Enum warningEnum) {
         String className = o.getClass().getName();
@@ -75,4 +76,8 @@ public class PigTezLogger implements Pig
         return aggregate;
     }
 
+    public void destroy() {
+        this.reporter = null;
+    }
+
 }
\ No newline at end of file

Modified: pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1588935&r1=1588934&r2=1588935&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/tez/src/org/apache/pig/data/DefaultAbstractBag.java Mon Apr 21 17:35:49 2014
@@ -51,7 +51,7 @@ public abstract class DefaultAbstractBag
     // If we grow past 100K, may be worthwhile to register.
     private static final int SPILL_REGISTER_THRESHOLD = 100 * 1024;
 
-    private static PigLogger pigLogger = PhysicalOperator.getPigLogger();
+    private static PigLogger pigLogger;
 
     private static InterSedes sedes = InterSedesFactory.getInterSedesInstance();
     // Container that holds the tuples. Actual object instantiated by
@@ -85,7 +85,7 @@ public abstract class DefaultAbstractBag
 
 
     /**
-     * Sample every SPILL_SAMPLE_FREQUENCYth tuple 
+     * Sample every SPILL_SAMPLE_FREQUENCYth tuple
      * until we reach a max of SPILL_SAMPLE_SIZE
      * to get an estimate of the tuple sizes.
      */