You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/02 22:57:57 UTC

svn commit: r652906 - in /incubator/pig/trunk: ./ conf/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/ src/org/apache/pig/impl/util/

Author: gates
Date: Fri May  2 13:57:56 2008
New Revision: 652906

URL: http://svn.apache.org/viewvc?rev=652906&view=rev
Log:
PIG-176: Change bag spilling so that bags below a certain threshold are not spilled, thus avoiding proliferation of small files.


Added:
    incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/conf/pig.properties
    incubator/pig/trunk/src/org/apache/pig/PigServer.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
    incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
    incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri May  2 13:57:56 2008
@@ -264,3 +264,6 @@
 
     PIG-224: fix to error handling code to produce correct error code
 
+	PIG-176: Change bag spilling so that bags below a certain threshold are
+	not spilled, thus avoiding proliferation of small files (pi_song via
+	gates).

Modified: incubator/pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/conf/pig.properties?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/conf/pig.properties (original)
+++ incubator/pig/trunk/conf/pig.properties Fri May  2 13:57:56 2008
@@ -32,3 +32,11 @@
 #hod.config.dir
 #hod.param
 
+
+#Do not spill temp files smaller than this size (bytes)
+#pig.spill.size.threshold=100000
+
+#EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes)
+#This should help reduce the number of files being spilled.
+#pig.spill.gc.activation.size=100000000
+

Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Fri May  2 13:57:56 2008
@@ -133,10 +133,6 @@
         this(execType, PropertiesUtil.loadPropertiesFromFile());
     }
 
-    public PigServer() throws ExecException {
-        this(ExecType.MAPREDUCE, new Properties());
-    }
-
     public PigServer(ExecType execType, Properties properties) throws ExecException {
         this.pigContext = new PigContext(execType, properties);
         if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Fri May  2 13:57:56 2008
@@ -60,6 +60,7 @@
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.impl.util.ConfigurationValidator;
 
 
 /**
@@ -133,6 +134,7 @@
         JobConf conf = new JobConf(config);
         setJobProperties(conf, pom);
         Properties properties = pom.pigContext.getProperties();
+        ConfigurationValidator.validatePigProperties(properties) ;
         String jobName = properties.getProperty(PigContext.JOB_NAME);
         conf.setJobName(jobName);
         boolean success = false;
@@ -160,6 +162,11 @@
             String user = System.getProperty("user.name");
             conf.setUser(user != null ? user : "Pigster");
 
+            conf.set("pig.spill.size.threshold", 
+                     properties.getProperty("pig.spill.size.threshold")) ;           
+            conf.set("pig.spill.gc.activation.size", 
+                    properties.getProperty("pig.spill.gc.activation.size")) ;                      
+           
             if (pom.reduceParallelism != -1) {
                 conf.setNumReduceTasks(pom.reduceParallelism);
             }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Fri May  2 13:57:56 2008
@@ -51,6 +51,7 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
 import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
 
 
 /**
@@ -193,6 +194,12 @@
                                jobConf.get("mapred.task.id"));
         properties.setProperty("pig.streaming.task.output.dir", 
                                jobConf.getOutputPath().toString());
+        properties.setProperty("pig.spill.size.threshold", 
+                                jobConf.get("pig.spill.size.threshold"));
+        properties.setProperty("pig.spill.gc.activation.size", 
+                                jobConf.get("pig.spill.gc.activation.size"));
+ 
+        SpillableMemoryManager.configure(properties) ;
     }
 
     /**

Added: incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java?rev=652906&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java Fri May  2 13:57:56 2008
@@ -0,0 +1,38 @@
+package org.apache.pig.impl.util;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ConfigurationValidator {
+    
+    private final static Log log = LogFactory.getLog(PropertiesUtil.class);
+    /***
+     * All pig configurations should be validated in here before use
+     * @param properties
+     */
+    
+    public static void validatePigProperties(Properties properties) {
+        ensureLongType(properties, "pig.spill.size.threshold", 0L) ;
+        ensureLongType(properties, "pig.spill.gc.activation.size", Long.MAX_VALUE) ;   
+    }
+    
+    private static void ensureLongType(Properties properties,
+                                       String key, 
+                                       long defaultValue) {
+        String str = properties.getProperty(key) ;          
+        if (str != null) {
+            try {
+                Long.parseLong(str) ;
+            }
+            catch (NumberFormatException  nfe) {
+                log.error(str + " has to be parsable to long") ;
+                properties.setProperty(key, Long.toString(defaultValue)) ;
+            }
+        }
+        else {
+            properties.setProperty(key, Long.toString(defaultValue)) ;
+        }        
+    }
+}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java Fri May  2 13:57:56 2008
@@ -55,6 +55,9 @@
 				properties.put(entry.getKey(), entry.getValue());
 			}
 		}
+		
+		// For telling error fast when there are problems
+		ConfigurationValidator.validatePigProperties(properties) ;
     }
 
     public static Properties loadPropertiesFromFile() {
@@ -62,4 +65,5 @@
         loadPropertiesFromFile(properties);
         return properties;
     }
+    
 }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri May  2 13:57:56 2008
@@ -10,6 +10,7 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Properties;
 
 import javax.management.Notification;
 import javax.management.NotificationEmitter;
@@ -34,6 +35,9 @@
     
     LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
     
+    private static long gcActivationSize = Long.MAX_VALUE ;
+    private static long spillFileSizeThreshold = 0L ;
+    
     public SpillableMemoryManager() {
         ((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
         List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
@@ -63,6 +67,21 @@
         biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5));    
     }
     
+    public static void configure(Properties properties) {
+        
+        try {
+            spillFileSizeThreshold = Long.parseLong(
+                        (String) properties.getProperty("pig.spill.size.threshold") ) ;
+            gcActivationSize = Long.parseLong(
+                    (String) properties.getProperty("pig.spill.gc.activation.size") ) ;
+        } 
+        catch (NumberFormatException  nfe) {
+            throw new RuntimeException("Error while converting system configurations" +
+            		"spill.size.threshold, spill.gc.activation.size", nfe) ;
+        }
+          
+    }
+    
     public void handleNotification(Notification n, Object o) {
         CompositeData cd = (CompositeData) n.getUserData();
         MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
@@ -124,8 +143,21 @@
                     continue;
                 }
                 long toBeFreed = s.getMemorySize();
-                s.spill();
+                // Don't keep trying if the rest of files are too small
+                if (toBeFreed < spillFileSizeThreshold) {
+                    break ;
+                }
+                
+                s.spill();               
+                
+                // This should significantly reduce the number of small files
+                // in case that we have a lot of nested bags
+                if (toBeFreed > gcActivationSize) {
+                    System.gc();
+                }
+                
                 estimatedFreed += toBeFreed;
+                
                 if (estimatedFreed > toFree) {
                     break;
                 }