You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/02 22:57:57 UTC
svn commit: r652906 - in /incubator/pig/trunk: ./ conf/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/
src/org/apache/pig/impl/util/
Author: gates
Date: Fri May 2 13:57:56 2008
New Revision: 652906
URL: http://svn.apache.org/viewvc?rev=652906&view=rev
Log:
PIG-176: Change bag spilling so that bags below a certain threshold are not spilled, thus avoiding proliferation of small files.
Added:
incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/conf/pig.properties
incubator/pig/trunk/src/org/apache/pig/PigServer.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Fri May 2 13:57:56 2008
@@ -264,3 +264,6 @@
PIG-224: fix to error handling code to produce correct error code
+ PIG-176: Change bag spilling so that bags below a certain threshold are
+ not spilled, thus avoiding proliferation of small files (pi_song via
+ gates).
Modified: incubator/pig/trunk/conf/pig.properties
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/conf/pig.properties?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/conf/pig.properties (original)
+++ incubator/pig/trunk/conf/pig.properties Fri May 2 13:57:56 2008
@@ -32,3 +32,11 @@
#hod.config.dir
#hod.param
+
+#Do not spill temp files smaller than this size (bytes)
+#pig.spill.size.threshold=100000
+
+#EXPERIMENT: Activate garbage collection when spilling a file bigger than this size (bytes)
+#This should help reduce the number of files being spilled.
+#pig.spill.gc.activation.size=100000000
+
Modified: incubator/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/PigServer.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/PigServer.java Fri May 2 13:57:56 2008
@@ -133,10 +133,6 @@
this(execType, PropertiesUtil.loadPropertiesFromFile());
}
- public PigServer() throws ExecException {
- this(ExecType.MAPREDUCE, new Properties());
- }
-
public PigServer(ExecType execType, Properties properties) throws ExecException {
this.pigContext = new PigContext(execType, properties);
if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Fri May 2 13:57:56 2008
@@ -60,6 +60,7 @@
import org.apache.pig.impl.util.JarManager;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.impl.util.ConfigurationValidator;
/**
@@ -133,6 +134,7 @@
JobConf conf = new JobConf(config);
setJobProperties(conf, pom);
Properties properties = pom.pigContext.getProperties();
+ ConfigurationValidator.validatePigProperties(properties) ;
String jobName = properties.getProperty(PigContext.JOB_NAME);
conf.setJobName(jobName);
boolean success = false;
@@ -160,6 +162,11 @@
String user = System.getProperty("user.name");
conf.setUser(user != null ? user : "Pigster");
+ conf.set("pig.spill.size.threshold",
+ properties.getProperty("pig.spill.size.threshold")) ;
+ conf.set("pig.spill.gc.activation.size",
+ properties.getProperty("pig.spill.gc.activation.size")) ;
+
if (pom.reduceParallelism != -1) {
conf.setNumReduceTasks(pom.reduceParallelism);
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Fri May 2 13:57:56 2008
@@ -51,6 +51,7 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.SpillableMemoryManager;
/**
@@ -193,6 +194,12 @@
jobConf.get("mapred.task.id"));
properties.setProperty("pig.streaming.task.output.dir",
jobConf.getOutputPath().toString());
+ properties.setProperty("pig.spill.size.threshold",
+ jobConf.get("pig.spill.size.threshold"));
+ properties.setProperty("pig.spill.gc.activation.size",
+ jobConf.get("pig.spill.gc.activation.size"));
+
+ SpillableMemoryManager.configure(properties) ;
}
/**
Added: incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java?rev=652906&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/ConfigurationValidator.java Fri May 2 13:57:56 2008
@@ -0,0 +1,38 @@
+package org.apache.pig.impl.util;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ConfigurationValidator {
+
+ private final static Log log = LogFactory.getLog(PropertiesUtil.class);
+ /***
+ * All pig configurations should be validated in here before use
+ * @param properties
+ */
+
+ public static void validatePigProperties(Properties properties) {
+ ensureLongType(properties, "pig.spill.size.threshold", 0L) ;
+ ensureLongType(properties, "pig.spill.gc.activation.size", Long.MAX_VALUE) ;
+ }
+
+ private static void ensureLongType(Properties properties,
+ String key,
+ long defaultValue) {
+ String str = properties.getProperty(key) ;
+ if (str != null) {
+ try {
+ Long.parseLong(str) ;
+ }
+ catch (NumberFormatException nfe) {
+ log.error(str + " has to be parsable to long") ;
+ properties.setProperty(key, Long.toString(defaultValue)) ;
+ }
+ }
+ else {
+ properties.setProperty(key, Long.toString(defaultValue)) ;
+ }
+ }
+}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/PropertiesUtil.java Fri May 2 13:57:56 2008
@@ -55,6 +55,9 @@
properties.put(entry.getKey(), entry.getValue());
}
}
+
+ // For telling error fast when there are problems
+ ConfigurationValidator.validatePigProperties(properties) ;
}
public static Properties loadPropertiesFromFile() {
@@ -62,4 +65,5 @@
loadPropertiesFromFile(properties);
return properties;
}
+
}
Modified: incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=652906&r1=652905&r2=652906&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Fri May 2 13:57:56 2008
@@ -10,6 +10,7 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import javax.management.Notification;
import javax.management.NotificationEmitter;
@@ -34,6 +35,9 @@
LinkedList<WeakReference<Spillable>> spillables = new LinkedList<WeakReference<Spillable>>();
+ private static long gcActivationSize = Long.MAX_VALUE ;
+ private static long spillFileSizeThreshold = 0L ;
+
public SpillableMemoryManager() {
((NotificationEmitter)ManagementFactory.getMemoryMXBean()).addNotificationListener(this, null, null);
List<MemoryPoolMXBean> mpbeans = ManagementFactory.getMemoryPoolMXBeans();
@@ -63,6 +67,21 @@
biggestHeap.setCollectionUsageThreshold((long)(biggestSize*.5));
}
+ public static void configure(Properties properties) {
+
+ try {
+ spillFileSizeThreshold = Long.parseLong(
+ (String) properties.getProperty("pig.spill.size.threshold") ) ;
+ gcActivationSize = Long.parseLong(
+ (String) properties.getProperty("pig.spill.gc.activation.size") ) ;
+ }
+ catch (NumberFormatException nfe) {
+ throw new RuntimeException("Error while converting system configurations" +
+ "spill.size.threshold, spill.gc.activation.size", nfe) ;
+ }
+
+ }
+
public void handleNotification(Notification n, Object o) {
CompositeData cd = (CompositeData) n.getUserData();
MemoryNotificationInfo info = MemoryNotificationInfo.from(cd);
@@ -124,8 +143,21 @@
continue;
}
long toBeFreed = s.getMemorySize();
- s.spill();
+ // Don't keep trying if the rest of files are too small
+ if (toBeFreed < spillFileSizeThreshold) {
+ break ;
+ }
+
+ s.spill();
+
+ // This should significantly reduce the number of small files
+ // in case that we have a lot of nested bags
+ if (toBeFreed > gcActivationSize) {
+ System.gc();
+ }
+
estimatedFreed += toBeFreed;
+
if (estimatedFreed > toFree) {
break;
}