You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/03/20 22:32:24 UTC

svn commit: r639469 - in /incubator/pig/trunk: ./ bin/ lib-src/shock/org/apache/pig/shock/ lib/ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/impl/io/

Author: olga
Date: Thu Mar 20 14:32:22 2008
New Revision: 639469

URL: http://svn.apache.org/viewvc?rev=639469&view=rev
Log:
PIG-18: changes to make pig work with hadoop 0.16 and HOD-0.4

Added:
    incubator/pig/trunk/lib/hadoop16.jar   (with props)
Removed:
    incubator/pig/trunk/bin/startHOD.expect
    incubator/pig/trunk/lib/hadoop14.jar
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/build.xml
    incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java
    incubator/pig/trunk/src/org/apache/pig/Main.java
    incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=639469&r1=639468&r2=639469&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu Mar 20 14:32:22 2008
@@ -5,6 +5,8 @@
 
   INCOMPATIBLE CHANGES
 
+  PIG-123: requires escape of '\' in chars and string
+
   NEW FEATURES
 
   OPTIMIZATIONS
@@ -169,6 +171,8 @@
 
 	PIG-106:  Change StringBuffer and String '+' to StringBuilder (francisoud
 	via gates).
+
+    PIG-18: changes to make pig work with Hadoop 0.16 and HOD 0.4 (olgan)
 
 	PIG-164:  Fix memory issue in SpillableMemoryManager to partially clean the list of
 	bags each time a new bag is added rather than waiting until the garbage

Modified: incubator/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=639469&r1=639468&r2=639469&view=diff
==============================================================================
--- incubator/pig/trunk/build.xml (original)
+++ incubator/pig/trunk/build.xml Thu Mar 20 14:32:22 2008
@@ -40,7 +40,7 @@
     <property name="dist.dir" value="${build.dir}/${final.name}" />
     <property name="build.encoding" value="ISO-8859-1" />
     <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
-    <property name="hadoop.jarfile" value="hadoop15.jar" />
+    <property name="hadoop.jarfile" value="hadoop16.jar" />
 
     <!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version -->
     <property name="output.jarfile" value="${build.dir}/${final.name}.jar" />

Modified: incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java?rev=639469&r1=639468&r2=639469&view=diff
==============================================================================
--- incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java (original)
+++ incubator/pig/trunk/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java Thu Mar 20 14:32:22 2008
@@ -30,6 +30,7 @@
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketImpl;
+import java.net.SocketOptions;
 import java.net.SocketImplFactory;
 import java.net.UnknownHostException;
 import java.net.Proxy.Type;
@@ -434,9 +435,12 @@
 	protected void sendUrgentData(int data) throws IOException {
 		throw new IOException("SSHSocketImpl does not implement sendUrgentData");
 	}
-
+    @Override
 	public Object getOption(int optID) throws SocketException {
-		throw new SocketException("SSHSocketImpl does not implement getOption");
+        if (optID == SocketOptions.SO_SNDBUF)
+            return new Integer(1024);
+        else
+		    throw new SocketException("SSHSocketImpl does not implement getOption for " + optID);
 	}
 
 	/**

Added: incubator/pig/trunk/lib/hadoop16.jar
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop16.jar?rev=639469&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/pig/trunk/lib/hadoop16.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/Main.java?rev=639469&r1=639468&r2=639469&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/Main.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/Main.java Thu Mar 20 14:32:22 2008
@@ -36,6 +36,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.timer.PerformanceTimerFactory;
@@ -274,8 +275,12 @@
         usage();
         rc = 1;
     } catch (Throwable e) {
-        log.error(e);
+        //log.error(e);
+        // this is a hack to see full error till we resolve commons logging config
+        e.printStackTrace();
     } finally {
+        // clear temp files
+        FileLocalizer.deleteTempFiles();
         PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
         System.exit(rc);
     }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java?rev=639469&r1=639468&r2=639469&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/ExecException.java Thu Mar 20 14:32:22 2008
@@ -8,14 +8,14 @@
     }
 
     public ExecException() {
-        this(null, null);
+        super();
     }
     
     public ExecException(String message) {
-        this(message, null);
+        super(message);
     }
     
     public ExecException(Throwable cause) {
-        this(null, cause);
+        super(cause);
     }
 }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=639469&r1=639468&r2=639469&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Mar 20 14:32:22 2008
@@ -1,7 +1,12 @@
 package org.apache.pig.backend.hadoop.executionengine;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.FileOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
@@ -43,6 +48,7 @@
 public class HExecutionEngine implements ExecutionEngine {
     
     private final Log log = LogFactory.getLog(getClass());
+    private static final String LOCAL = "local";
     
     protected PigContext pigContext;
     
@@ -120,14 +126,14 @@
         }
         else {
             if (cluster != null && cluster.length() > 0) {
-                if(cluster.indexOf(':') < 0 && !cluster.equalsIgnoreCase("local")) {
+                if(cluster.indexOf(':') < 0 && !cluster.equalsIgnoreCase(LOCAL)) {
                     cluster = cluster + ":50020";
                 }
                 setJobtrackerLocation(cluster);
             }
 
             if (nameNode!=null && nameNode.length() > 0) {
-                if(nameNode.indexOf(':') < 0 && !nameNode.equalsIgnoreCase("local")) {
+                if(nameNode.indexOf(':') < 0 && !nameNode.equalsIgnoreCase(LOCAL)) {
                     nameNode = nameNode + ":8020";
                 }
                 setFilesystemLocation(nameNode);
@@ -143,7 +149,7 @@
             throw new ExecException("Failed to create DataStorage", e);
         }
             
-        if(cluster != null && !cluster.equalsIgnoreCase("local")){
+        if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
 	        log.info("Connecting to map-reduce job tracker at: " + conf.get("mapred.job.tracker"));
 	        
 	        try {
@@ -166,7 +172,7 @@
     }
 
     public void close() throws ExecException {
-        ;
+            closeHod(System.getProperty("hod.server"));
     }
         
     public Properties getConfiguration() throws ExecException {
@@ -315,10 +321,15 @@
     //To prevent doing hod if the pig server is constructed multiple times
     private static String hodMapRed;
     private static String hodHDFS;
-
-    private enum ParsingState {
-        NOTHING, HDFSUI, MAPREDUI, HDFS, MAPRED, HADOOPCONF
-    };
+    private String hodConfDir = null; 
+    private String remoteHodConfDir = null; 
+    private Process hodProcess = null;
+
+    class ShutdownThread extends Thread{
+        public synchronized void run() {
+            closeHod(System.getProperty("hod.server"));
+        }
+    }
     
     private String[] doHod(String server) throws ExecException {
         if (hodMapRed != null) {
@@ -326,125 +337,69 @@
         }
         
         try {
-            Process p = null;
-            // Make the kryptonite released version the default if nothing
-            // else is specified.
-            StringBuilder cmd = new StringBuilder();
-            cmd.append(System.getProperty("hod.expect.root"));
-            cmd.append('/');
-            cmd.append("libexec/pig/");
-            cmd.append(System.getProperty("hod.expect.uselatest"));
-            cmd.append('/');
-            cmd.append(System.getProperty("hod.command"));
-
-            String cluster = System.getProperty("yinst.cluster");
-           
-            // TODO This is a Yahoo specific holdover, need to remove
-            // this.
-            if (cluster != null && cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
-                cmd.append(" --config=");
-                cmd.append(System.getProperty("hod.config.dir"));
-                cmd.append('/');
-                cmd.append(cluster);
-            }
+            // first, create temp director to store the configuration
+            hodConfDir = createTempDir(server);
+			
+			// get the number of nodes out of the command or use default
+            StringBuilder hodParams = new StringBuilder(System.getProperty("hod.param", ""));
+			int nodes = getNumNodes(hodParams);
+
+            // command format: hod allocate - d <cluster_dir> -n <number_of_nodes> <other params>
+            String[] cmdarray = new String[7];
+            cmdarray[0] = "hod";
+            cmdarray[1] = "allocate";
+            cmdarray[2] = "-d";
+            cmdarray[3] = hodConfDir;
+            cmdarray[4] = "-n";
+            cmdarray[5] = Integer.toString(nodes);
+            cmdarray[6] = hodParams.toString();
 
-            cmd.append(" " + System.getProperty("hod.param", ""));
+            log.info("Connecting to HOD...");
+            log.debug("sending HOD command " + cmdToString(cmdarray));
 
-            if (server.equals("local")) {
-                p = Runtime.getRuntime().exec(cmd.toString());
-            } 
-            else {
-                SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
-                p = fac.ssh(cmd.toString());
-            }
-            
-            InputStream is = p.getInputStream();
+            // setup shutdown hook to make sure we tear down hod connection
+            Runtime.getRuntime().addShutdownHook(new ShutdownThread());
 
-            log.info("Connecting to HOD...");
-            log.debug("sending HOD command " + cmd.toString());
+            hodProcess = runCommand(server, cmdarray);
+
+            // print all the information provided by HOD
+            try {
+                BufferedReader br = new BufferedReader(new InputStreamReader(hodProcess.getErrorStream()));
+                String msg;
+                while ((msg = br.readLine()) != null)
+                    log.info(msg);
+                br.close();
+            } catch(IOException ioe) {}
+
+            // for remote connection we need to bring the file locally  
+            if (!server.equals(LOCAL))
+                hodConfDir = copyHadoopConfLocally(server);
 
-            StringBuilder sb = new StringBuilder();
-            int c;
-            String hdfsUI = null;
-            String mapredUI = null;
             String hdfs = null;
             String mapred = null;
-            String hadoopConf = null;
+            String hadoopConf = hodConfDir + "/hadoop-site.xml";
 
-            ParsingState current = ParsingState.NOTHING;
+            log.info ("Hadoop configuration file: " + hadoopConf);
+
+            JobConf jobConf = new JobConf(hadoopConf);
+            jobConf.addResource("pig-cluster-hadoop-site.xml");
+            conf = new HConfiguration(jobConf);
+
+            hdfs = (String)conf.get("fs.default.name");
+            if (hdfs == null)
+                throw new ExecException("Missing fs.default.name from hadoop configuration");
+            log.info("HDFS: " + hdfs);
+
+            mapred = (String)conf.get("mapred.job.tracker");
+            if (mapred == null)
+                throw new ExecException("Missing mapred.job.tracker from hadoop configuration");
+            log.info("JobTracker: " + mapred);
 
-            while((c = is.read()) != -1 && mapred == null) {
-                if (c == '\n' || c == '\r') {
-                    switch(current) {
-                    case HDFSUI:
-                        hdfsUI = sb.toString().trim();
-                        log.info("HDFS Web UI: " + hdfsUI);
-                        break;
-                    case HDFS:
-                        hdfs = sb.toString().trim();
-                        log.info("HDFS: " + hdfs);
-                        break;
-                    case MAPREDUI:
-                        mapredUI = sb.toString().trim();
-                        log.info("JobTracker Web UI: " + mapredUI);
-                        break;
-                    case MAPRED:
-                        mapred = sb.toString().trim();
-                        log.info("JobTracker: " + mapred);
-                        break;
-                    case HADOOPCONF:
-                        hadoopConf = sb.toString().trim();
-                        log.info("HadoopConf: " + hadoopConf);
-                        break;
-                    }
-                    current = ParsingState.NOTHING;
-                    sb = new StringBuilder();
-                }
-                sb.append((char)c);
-                if (sb.indexOf("hdfsUI:") != -1) {
-                    current = ParsingState.HDFSUI;
-                    sb = new StringBuilder();
-                } 
-                else if (sb.indexOf("hdfs:") != -1) {
-                    current = ParsingState.HDFS;
-                    sb = new StringBuilder();
-                } 
-                else if (sb.indexOf("mapredUI:") != -1) {
-                    current = ParsingState.MAPREDUI;
-                    sb = new StringBuilder();
-                } 
-                else if (sb.indexOf("mapred:") != -1) {
-                    current = ParsingState.MAPRED;
-                    sb = new StringBuilder();
-                } 
-                else if (sb.indexOf("hadoopConf:") != -1) {
-                    current = ParsingState.HADOOPCONF;
-                    sb = new StringBuilder();
-                }    
-            }
-            
-            hdfsUI = fixUpDomain(hdfsUI);
             hdfs = fixUpDomain(hdfs);
-            mapredUI = fixUpDomain(mapredUI);
             mapred = fixUpDomain(mapred);
             hodHDFS = hdfs;
             hodMapRed = mapred;
 
-            if (hadoopConf != null) {
-                JobConf jobConf = new JobConf(hadoopConf);
-                jobConf.addResource("pig-cluster-hadoop-site.xml");
-                
-                conf = new HConfiguration(jobConf);
-                
-                // make sure that files on class path are used
-                System.out.println("Job Conf = " + conf);
-                System.out.println("dfs.block.size= " + conf.get("dfs.block.size"));
-                System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout"));
-                System.out.println("mapred.child.java.opts= " + conf.get("mapred.child.java.opts"));
-            }
-            else {
-                throw new IOException("Missing Hadoop configuration file");
-            }
             return new String[] {hdfs, mapred};
         } 
         catch (Exception e) {
@@ -454,15 +409,240 @@
         }
     }
 
+    private synchronized void closeHod(String server){
+            if (hodProcess == null)
+                return;
+
+            // hod deallocate format: hod deallocate -d <conf dir>
+            String[] cmdarray = new String[4];
+			cmdarray[0] = "hod";
+            cmdarray[1] = "deallocate";
+            cmdarray[2] = "-d";
+            if (remoteHodConfDir != null)
+                cmdarray[3] = remoteHodConfDir;
+            else
+                cmdarray[3] = hodConfDir;
+
+            log.info("Disconnecting from HOD...");
+            log.debug("Disconnect command: " + cmdToString(cmdarray));
+
+            try {
+                Process p = runCommand(server, cmdarray);
+           } catch (Exception e) {
+                log.warn("Failed to disconnect from HOD; error: " + e.getMessage());
+           } finally {
+               if (remoteHodConfDir != null)
+                   deleteDir(server, remoteHodConfDir);
+               deleteDir(LOCAL, hodConfDir);
+           }
+
+           hodProcess = null;
+    }
+
+    private String copyHadoopConfLocally(String server) throws ExecException {
+        String localDir = createTempDir(LOCAL);
+        String remoteFile = new String(hodConfDir + "/hadoop-site.xml");
+        String localFile = new String(localDir + "/hadoop-site.xml");
+
+        remoteHodConfDir = hodConfDir;
+
+        String[] cmdarray = new String[2];
+        cmdarray[0] = "cat";
+        cmdarray[1] = remoteFile;
+
+        Process p = runCommand(server, cmdarray);
+
+        BufferedWriter bw;
+        try {
+            bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile)));
+        } catch (Exception e){
+            throw new ExecException("Failed to create local hadoop file " + localFile, e);
+        }
+
+        try {
+            BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            String line;
+            while ((line = br.readLine()) != null){
+                bw.write(line, 0, line.length());
+                bw.newLine();
+            }
+            br.close();
+            bw.close();
+        } catch (Exception e){
+            throw new ExecException("Failed to copy data to local hadoop file " + localFile, e);
+        }
+
+        return localDir;
+    }
+
+    private String cmdToString(String[] cmdarray) {
+        StringBuilder cmd = new StringBuilder();
+
+        for (int i = 0; i < cmdarray.length; i++) {
+            cmd.append(cmdarray[i]);
+            cmd.append(' ');
+        }
+
+        return cmd.toString();
+    }
+    private Process runCommand(String server, String[] cmdarray) throws ExecException {
+        Process p;
+        try {
+            if (server.equals(LOCAL)) {
+                p = Runtime.getRuntime().exec(cmdarray);
+            } 
+            else {
+                SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
+                p = fac.ssh(cmdToString(cmdarray));
+            }
+
+            //this should return as soon as connection is shutdown
+            int rc = p.waitFor();
+            if (rc != 0) {
+                String errMsg = new String();
+                try {
+                    BufferedReader br = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+                    errMsg = br.readLine();
+                    br.close();
+                } catch (IOException ioe) {}
+                StringBuilder msg = new StringBuilder("Failed to run command ");
+                msg.append(cmdToString(cmdarray));
+                msg.append(" on server ");
+                msg.append(server);
+                msg.append("; return code: ");
+                msg.append(rc);
+                msg.append("; error: ");
+                msg.append(errMsg);
+                throw new ExecException(msg.toString());
+            }
+        } catch (Exception e){
+            throw new ExecException(e);
+        }
+
+        return p;
+    }
+
+    private void deleteDir(String server, String dir) {
+        if (server.equals(LOCAL)){
+            File path = new File(dir);
+            deleteLocalDir(path);
+        }
+        else { 
+            // send rm command over ssh
+            String[] cmdarray = new String[3];
+			cmdarray[0] = "rm";
+            cmdarray[1] = "-rf";
+            cmdarray[2] = dir;
+
+            try{
+                Process p = runCommand(server, cmdarray);
+            }catch(Exception e){
+                    log.warn("Failed to remove HOD configuration directory - " + dir);
+            }
+        }
+    }
+
+    private void deleteLocalDir(File path){
+        File[] files = path.listFiles();
+        int i;
+        for (i = 0; i < files.length; i++){
+            if (files[i].isHidden())
+                continue;
+            if (files[i].isFile())
+                files[i].delete();
+            else if (files[i].isDirectory())
+                deleteLocalDir(files[i]);
+        }
+
+        path.delete();
+    }
+
     private String fixUpDomain(String hostPort) throws UnknownHostException {
         String parts[] = hostPort.split(":");
         if (parts[0].indexOf('.') == -1) {
-            parts[0] = parts[0] + ".inktomisearch.com";
+            String domain = System.getProperty("cluster.domain");
+            if (domain == null) 
+                throw new RuntimeException("Missing cluster.domain property!");
+            parts[0] = parts[0] + "." + domain;
         }
         InetAddress.getByName(parts[0]);
         return parts[0] + ":" + parts[1];
     }
-    
+
+    // create temp dir to store hod output; removed on exit
+    // format: <tempdir>/PigHod.<host name>.<user name>.<nanosecondts>
+    private String createTempDir(String server) throws ExecException {
+        StringBuilder tempDirPrefix  = new StringBuilder ();
+        
+        if (server.equals(LOCAL))
+            tempDirPrefix.append(System.getProperty("java.io.tmpdir"));
+        else
+            // for remote access we assume /tmp as temp dir
+            tempDirPrefix.append("/tmp");
+
+        tempDirPrefix.append("/PigHod.");
+        try {
+            tempDirPrefix.append(InetAddress.getLocalHost().getHostName());
+            tempDirPrefix.append(".");
+        } catch (UnknownHostException e) {}
+            
+        tempDirPrefix.append(System.getProperty("user.name"));
+        tempDirPrefix.append(".");
+        String path;
+        do {
+            path = tempDirPrefix.toString() + System.nanoTime();
+        } while (!createDir(server, path));
+
+        return path;
+    }
+
+    private boolean createDir(String server, String dir) throws ExecException{
+        if (server.equals(LOCAL)){ 
+            // create local directory
+            File tempDir = new File(dir);
+            boolean success = tempDir.mkdir();
+            if (!success)
+                log.warn("Failed to create HOD configuration directory - " + dir + ". Retrying ...");
+
+            return success;
+        }
+        else {
+            String[] cmdarray = new String[2];
+			cmdarray[0] = "mkdir ";
+            cmdarray[1] = dir;
+
+            try{
+                Process p = runCommand(server, cmdarray);
+            }
+            catch(ExecException e){
+                    log.warn("Failed to create HOD configuration directory - " + dir + "Retrying...");
+                    return false;
+            }
+
+            return true;
+        }
+    }
+
+    // returns number of nodes based on -m option in hodParams if present;
+    // otherwise, default is used; -m is removed from the params
+    int getNumNodes(StringBuilder hodParams) {
+        String val = hodParams.toString();
+        int startPos = val.indexOf("-m ");
+        if (startPos == -1)
+            startPos = val.indexOf("-m\t");
+        if (startPos != -1) {
+            int curPos = startPos + 3;
+            int len = val.length();
+            while (curPos < len && Character.isWhitespace(val.charAt(curPos))) curPos ++;
+            int numStartPos = curPos;
+            while (curPos < len && Character.isDigit(val.charAt(curPos))) curPos ++;
+            int nodes = Integer.parseInt(val.substring(numStartPos, curPos));
+            hodParams.delete(startPos, curPos);
+            return nodes;
+        } else {
+            return Integer.getInteger("hod.nodes", 15);
+        }
+    }
 }
 
 

Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=639469&r1=639468&r2=639469&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu Mar 20 14:32:22 2008
@@ -251,20 +251,24 @@
             initialized = true;
             relativeRoot = pigContext.getDfs().asContainer("/tmp/temp" + r.nextInt());
             toDelete.push(relativeRoot);
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    while (!toDelete.empty()) {
-                        try {
-                            ElementDescriptor elem = toDelete.pop();
-                            elem.delete();
-                        } 
-                        catch (IOException e) {
-                            log.error(e);
-                        }
-                    }
-                }
-            });
+            // Runtime.getRuntime().addShutdownHook(new Thread() {
+              //   @Override
+            //     public void run() {
+            //          deleteTempFiles();
+            //     }
+            //});
+        }
+    }
+
+    public static void deleteTempFiles() {
+        while (!toDelete.empty()) {
+            try {
+                ElementDescriptor elem = toDelete.pop();
+                elem.delete();
+            } 
+            catch (IOException e) {
+                log.error(e);
+            }
         }
     }