You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/07/19 03:43:32 UTC

svn commit: r678090 [1/2] - in /incubator/pig/branches/types: lib-src/shock/org/apache/pig/shock/ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/datastorage/ src/org/apache/pig/backend/hadoop/execution...

Author: olga
Date: Fri Jul 18 18:43:31 2008
New Revision: 678090

URL: http://svn.apache.org/viewvc?rev=678090&view=rev
Log:
merge of PIG-111-215-236-18-266-291

Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
Modified:
    incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java
    incubator/pig/branches/types/src/org/apache/pig/Main.java
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
    incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
    incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
    incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
    incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpString.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestGrunt.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestInfixArithmetic.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestInputOutputFileValidator.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestJobSubmission.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLargeFile.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestMapReduce.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestOrderBy.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPi.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigScriptParser.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigServer.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestPigSplit.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestStoreOld.java
    incubator/pig/branches/types/test/org/apache/pig/test/Util.java
    incubator/pig/branches/types/test/org/apache/pig/test/utils/LogicalPlanTester.java

Modified: incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java (original)
+++ incubator/pig/branches/types/lib-src/shock/org/apache/pig/shock/SSHSocketImplFactory.java Fri Jul 18 18:43:31 2008
@@ -30,6 +30,7 @@
 import java.net.SocketAddress;
 import java.net.SocketException;
 import java.net.SocketImpl;
+import java.net.SocketOptions;
 import java.net.SocketImplFactory;
 import java.net.UnknownHostException;
 import java.net.Proxy.Type;
@@ -436,7 +437,10 @@
     }
 
     public Object getOption(int optID) throws SocketException {
-        throw new SocketException("SSHSocketImpl does not implement getOption");
+        if (optID == SocketOptions.SO_SNDBUF)
+            return new Integer(1024);
+        else
+		    throw new SocketException("SSHSocketImpl does not implement getOption for " + optID);
     }
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/Main.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/Main.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/Main.java Fri Jul 18 18:43:31 2008
@@ -33,9 +33,12 @@
 import org.apache.log4j.PatternLayout;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.PropertiesUtil;
 import org.apache.pig.tools.cmdline.CmdLineParser;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.timer.PerformanceTimerFactory;
@@ -46,7 +49,13 @@
 
     private final static Log log = LogFactory.getLog(Main.class);
     
-private enum ExecMode {STRING, FILE, SHELL, UNKNOWN};
+    private static final String LOG4J_CONF = "log4jconf";
+    private static final String BRIEF = "brief";
+    private static final String DEBUG = "debug";
+    private static final String JAR = "jar";
+    private static final String VERBOSE = "verbose";
+    
+    private enum ExecMode {STRING, FILE, SHELL, UNKNOWN};
                 
 /**
  * The Main-Class for the Pig Jar that will provide a shell and setup a classpath appropriate
@@ -61,26 +70,17 @@
 public static void main(String args[])
 {
     int rc = 1;
-    PigContext pigContext = new PigContext();
+    Properties properties = new Properties();
+    PropertiesUtil.loadPropertiesFromFile(properties);
 
     try {
-        BufferedReader in = null;
         BufferedReader pin = null;
-        ExecMode mode = ExecMode.UNKNOWN;
-        int port = 0;
-        String file = null;
-        Level logLevel = Level.INFO;
-        boolean brief = false;
-        String log4jconf = null;
-        boolean verbose = false;
         boolean debug = false;
         boolean dryrun = false;
         ArrayList<String> params = new ArrayList<String>();
         ArrayList<String> paramFiles = new ArrayList<String>();
 
         CmdLineParser opts = new CmdLineParser(args);
-        // Don't use -l, --latest, -c, --cluster, -cp, -classpath, -D as these
-        // are masked by the startup perl script.
         opts.registerOpt('4', "log4jconf", CmdLineParser.ValueExpected.REQUIRED);
         opts.registerOpt('b', "brief", CmdLineParser.ValueExpected.NOT_ACCEPTED);
         opts.registerOpt('c', "cluster", CmdLineParser.ValueExpected.REQUIRED);
@@ -97,32 +97,48 @@
         opts.registerOpt('m', "param_file", CmdLineParser.ValueExpected.OPTIONAL);
         opts.registerOpt('r', "dryrun", CmdLineParser.ValueExpected.NOT_ACCEPTED);
 
+        ExecMode mode = ExecMode.UNKNOWN;
+        String file = null;
+        ExecType execType = ExecType.MAPREDUCE ;
+        String execTypeString = properties.getProperty("exectype");
+        if(execTypeString!=null && execTypeString.length()>0){
+            execType = PigServer.parseExecType(execTypeString);
+        }
+        String cluster = "local";
+        String clusterConfigured = properties.getProperty("cluster");
+        if(clusterConfigured != null && clusterConfigured.length() > 0){
+            cluster = clusterConfigured;
+        }
+
         char opt;
         while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
             switch (opt) {
             case '4':
-                log4jconf = opts.getValStr();
+                String log4jconf = opts.getValStr();
+                if(log4jconf != null){
+                    properties.setProperty(LOG4J_CONF, log4jconf);
+                }
                 break;
 
             case 'b':
-                brief = true;
+                properties.setProperty(BRIEF, "true");
                 break;
 
-            case 'c': {
+            case 'c': 
                 // Needed away to specify the cluster to run the MR job on
                 // Bug 831708 - fixed
-                   String cluster = opts.getValStr();
-                   System.out.println("Changing MR cluster to " + cluster);
-                   if(cluster.indexOf(':') < 0) {
-                       cluster = cluster + ":50020";
-                   }
-                   pigContext.setJobtrackerLocation(cluster);
+                String clusterParameter = opts.getValStr();
+                if (clusterParameter != null && clusterParameter.length() > 0) {
+                    cluster = clusterParameter;
+                }
                 break;
-                      }
 
             case 'd':
+                String logLevel = opts.getValStr();
+                if (logLevel != null) {
+                    properties.setProperty(DEBUG, logLevel);
+                }
                 debug = true;
-                logLevel = Level.toLevel(opts.getValStr(), Level.INFO);
                 break;
                 
             case 'e': 
@@ -138,29 +154,26 @@
                 usage();
                 return;
 
-            case 'j': {
-                   String splits[] = opts.getValStr().split(":", -1);
-                   for (int i = 0; i < splits.length; i++) {
-                       if (splits[i].length() > 0) {
-                        pigContext.addJar(splits[i]);
-                       }
-                   }
+            case 'j': 
+                String jarsString = opts.getValStr();
+                if(jarsString != null){
+                    properties.setProperty(JAR, jarsString);
+                }
                 break;
-                      }
 
             case 'm':
                 paramFiles.add(opts.getValStr());
                 break;
                             
-            case 'o': {
-                   String gateway = System.getProperty("ssh.gateway");
-                   if (gateway == null || gateway.length() == 0) {
-                       System.setProperty("hod.server", "local");
-                   } else {
-                       System.setProperty("hod.server", System.getProperty("ssh.gateway"));
-                   }
+            case 'o': 
+                // TODO sgroschupf using system properties is always a very bad idea
+                String gateway = System.getProperty("ssh.gateway");
+                if (gateway == null || gateway.length() == 0) {
+                    properties.setProperty("hod.server", "local");
+                } else {
+                    properties.setProperty("hod.server", System.getProperty("ssh.gateway"));
+                }
                 break;
-                      }
 
             case 'p': 
                 String val = opts.getValStr();
@@ -174,17 +187,15 @@
                 break;
                             
             case 'v':
-                verbose = true;
+                properties.setProperty(VERBOSE, ""+true);
                 break;
 
             case 'x':
-                ExecType exectype;
-                   try {
-                       exectype = PigServer.parseExecType(opts.getValStr());
-                   } catch (IOException e) {
-                       throw new RuntimeException("ERROR: Unrecognized exectype.", e);
-                   }
-                   pigContext.setExecType(exectype);
+                try {
+                    execType = PigServer.parseExecType(opts.getValStr());
+                    } catch (IOException e) {
+                        throw new RuntimeException("ERROR: Unrecognized exectype.", e);
+                    }
                 break;
             case 'i':
             	System.out.println(getVersionString());
@@ -195,44 +206,16 @@
                      }
             }
         }
+        // configure logging
+        configureLog4J(properties);
+        // create the context with the parameter
+        PigContext pigContext = new PigContext(execType, properties);
 
         LogicalPlanBuilder.classloader = pigContext.createCl(null);
 
-        if (log4jconf != null) {
-            PropertyConfigurator.configure(log4jconf);
-        } else if (!brief) {
-            // non-brief logging - timestamps
-            Properties props = new Properties();
-            props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
-            props.setProperty("log4j.appender.PIGCONSOLE",
-                              "org.apache.log4j.ConsoleAppender");
-            props.setProperty("log4j.appender.PIGCONSOLE.layout",
-                              "org.apache.log4j.PatternLayout");
-            props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
-                              "%d [%t] %-5p %c - %m%n");
-            PropertyConfigurator.configure(props);
-            // Set the log level/threshold
-            Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
-        } else {
-            // brief logging - no timestamps
-            Properties props = new Properties();
-            props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
-            props.setProperty("log4j.appender.PIGCONSOLE",
-                              "org.apache.log4j.ConsoleAppender");
-            props.setProperty("log4j.appender.PIGCONSOLE.layout",
-                              "org.apache.log4j.PatternLayout");
-            props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
-                              "%m%n");
-            PropertyConfigurator.configure(props);
-            // Set the log level/threshold
-            Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
-        }
-
-        // TODO Add a file appender for the logs
-        // TODO Need to create a property in the properties file for it.
-
         // construct the parameter subsitution preprocessor
         Grunt grunt = null;
+        BufferedReader in;
         String substFile = null;
         switch (mode) {
         case FILE:
@@ -321,14 +304,65 @@
         usage();
         rc = 1;
     } catch (Throwable e) {
+        //log.error(e);
+        // this is a hack to see full error till we resolve commons logging config
         e.printStackTrace();
-        log.error(e);
     } finally {
+        // clear temp files
+        FileLocalizer.deleteTempFiles();
         PerformanceTimerFactory.getPerfTimerFactory().dumpTimers();
         System.exit(rc);
     }
 }
 
+//TODO jz: log4j.properties should be used instead
+private static void configureLog4J(Properties properties) {
+    // TODO Add a file appender for the logs
+    // TODO Need to create a property in the properties file for it.
+    // sgroschupf, 25Feb2008: this method will be obsolete with PIG-115.
+     
+    String log4jconf = properties.getProperty(LOG4J_CONF);
+    String trueString = "true";
+    boolean brief = trueString.equalsIgnoreCase(properties.getProperty(BRIEF));
+    boolean verbose = trueString.equalsIgnoreCase(properties.getProperty(VERBOSE));
+    Level logLevel = Level.INFO;
+
+    String logLevelString = properties.getProperty(DEBUG);
+    if (logLevelString != null){
+        logLevel = Level.toLevel(logLevelString, Level.INFO);
+    }
+    
+    if (log4jconf != null) {
+         PropertyConfigurator.configure(log4jconf);
+     } else if (!brief ) {
+         // non-brief logging - timestamps
+         Properties props = new Properties();
+         props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+         props.setProperty("log4j.appender.PIGCONSOLE",
+                           "org.apache.log4j.ConsoleAppender");
+         props.setProperty("log4j.appender.PIGCONSOLE.layout",
+                           "org.apache.log4j.PatternLayout");
+         props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+                           "%d [%t] %-5p %c - %m%n");
+         PropertyConfigurator.configure(props);
+         // Set the log level/threshold
+         Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+     } else {
+         // brief logging - no timestamps
+         Properties props = new Properties();
+         props.setProperty("log4j.rootLogger", "INFO, PIGCONSOLE");
+         props.setProperty("log4j.appender.PIGCONSOLE",
+                           "org.apache.log4j.ConsoleAppender");
+         props.setProperty("log4j.appender.PIGCONSOLE.layout",
+                           "org.apache.log4j.PatternLayout");
+         props.setProperty("log4j.appender.PIGCONSOLE.layout.ConversionPattern",
+                           "%m%n");
+         PropertyConfigurator.configure(props);
+         // Set the log level/threshold
+         Logger.getRootLogger().setLevel(verbose ? Level.ALL : logLevel);
+     }
+}
+ 
 // retruns the stream of final pig script to be passed to Grunt
 private static BufferedReader runParamPreprocessor(BufferedReader origPigScript, ArrayList<String> params,
                                             ArrayList<String> paramFiles, String scriptFile, boolean createFile) 

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Fri Jul 18 18:43:31 2008
@@ -62,6 +62,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.impl.util.PropertiesUtil;
 
 
 /**
@@ -105,16 +106,23 @@
         return user + "-" + date;
     }
     
-    public PigServer(String execType) throws ExecException, IOException {
-        this(parseExecType(execType));
+    public PigServer(String execTypeString) throws ExecException, IOException {
+        this(parseExecType(execTypeString));
     }
     
+    public PigServer(ExecType execType) throws ExecException {
+        this(execType, PropertiesUtil.loadPropertiesFromFile());
+    }
+
     public PigServer() throws ExecException {
         this(ExecType.MAPREDUCE);
     }
     
-    public PigServer(ExecType execType) throws ExecException {
-        this.pigContext = new PigContext(execType);
+    public PigServer(ExecType execType, Properties properties) throws ExecException {
+        this.pigContext = new PigContext(execType, properties);
+        if (this.pigContext.getProperties().getProperty(PigContext.JOB_NAME) == null) {
+            setJobName("DefaultJobName") ;
+        }
         pigContext.connect();
     }
     
@@ -256,7 +264,7 @@
     }
 
     public void setJobName(String name){
-        pigContext.setJobName(name);
+        pigContext.getProperties().setProperty(PigContext.JOB_NAME, PigContext.JOB_NAME_PREFIX + ":" + name);
     }
     
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java Fri Jul 18 18:43:31 2008
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Iterator;
+import java.util.Properties;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -40,7 +41,7 @@
     public static void main(String args[]) throws IOException, ExecException {
         
         BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
-        pig = new PigServer(ExecType.LOCAL);
+        pig = new PigServer(ExecType.LOCAL, new Properties());
         
         while (true) {
             System.out.print("> ");

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecException.java Fri Jul 18 18:43:31 2008
@@ -26,14 +26,14 @@
     }
 
     public ExecException() {
-        this(null, null);
+        super();
     }
     
     public ExecException(String message) {
-        this(message, null);
+        super(message);
     }
     
     public ExecException(Throwable cause) {
-        this(null, cause);
+        super(cause);
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Fri Jul 18 18:43:31 2008
@@ -32,49 +32,65 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.dfs.DistributedFileSystem;
 import org.apache.hadoop.conf.Configuration;
-
-import org.apache.pig.backend.datastorage.*;
-
+import org.apache.pig.backend.datastorage.ContainerDescriptor;
+import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.backend.datastorage.DataStorageException;
+import org.apache.pig.backend.datastorage.ElementDescriptor;
 
 public class HDataStorage implements DataStorage {
 
-    
+    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+
     private FileSystem fs;
-    
-    public HDataStorage(URI uri, Configuration conf) throws IOException {
-        this(uri, new HConfiguration(conf));
-    }
-    
-    public HDataStorage(URI uri, HConfiguration conf) throws IOException {
-        fs = FileSystem.get(uri, conf.getConfiguration());
+    private Configuration configuration;
+    private Properties properties;
+    private URI uri;
+
+    public HDataStorage(URI uri, Properties properties) {
+        this.properties = properties;
+        this.uri = uri;
+        init();
     }
 
-    public HDataStorage(Configuration conf) throws IOException {
-        this(new HConfiguration(conf));
+    public HDataStorage(Properties properties) {
+        this.properties = properties;
+        init();
     }
-    
-    public HDataStorage(HConfiguration conf) throws IOException {
-        fs = FileSystem.get(conf.getConfiguration());
+
+    public void init() {
+        // check if name node is set, if not we set local as fail back
+        String nameNode = this.properties.getProperty(FILE_SYSTEM_LOCATION);
+        if (nameNode == null || nameNode.length() == 0) {
+            nameNode = "local";
+        }
+        this.configuration = ConfigurationUtil.toConfiguration(this.properties);
+        try {
+            if (this.uri != null) {
+                this.fs = FileSystem.get(this.uri, this.configuration);
+            } else {
+                this.fs = FileSystem.get(this.configuration);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create DataStorage", e);
+        }
+        short defaultReplication = fs.getDefaultReplication();
+        properties.setProperty(DEFAULT_REPLICATION_FACTOR_KEY, (new Short(
+                defaultReplication)).toString());
     }
 
-    public void init() { }
-    
     public void close() throws IOException {
         fs.close();
     }
     
     public Properties getConfiguration() {
-        Properties props = new HConfiguration(fs.getConf());
-                
-        short defaultReplication = fs.getDefaultReplication();
-        props.setProperty(DEFAULT_REPLICATION_FACTOR_KEY,
-                          (new Short(defaultReplication)).toString());
-        
-        return props;
+        return this.properties;
     }
-    
-    public void updateConfiguration(Properties newConfiguration) 
-            throws DataStorageException {        
+
+    public void updateConfiguration(Properties newConfiguration)
+            throws DataStorageException {
+        // TODO sgroschupf 25Feb2008 this method is never called and
+        // I'm even not sure if hadoop would support that, I doubt it.
+
         if (newConfiguration == null) {
             return;
         }
@@ -110,8 +126,7 @@
         return stats;
     }
     
-    public ElementDescriptor asElement(String name) 
-            throws DataStorageException {
+    public ElementDescriptor asElement(String name) throws DataStorageException {
         if (this.isContainer(name)) {
             return new HDirectory(this, name);
         }

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Jul 18 18:43:31 2008
@@ -18,22 +18,31 @@
 
 package org.apache.pig.backend.hadoop.executionengine;
 
+import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.FileOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.PrintStream;
 import java.net.InetAddress;
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketImplFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -45,7 +54,7 @@
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
-import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.impl.PigContext;
@@ -66,12 +75,18 @@
 
 public class HExecutionEngine implements ExecutionEngine {
     
+    private static final String HOD_SERVER = "hod.server";
+    public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
+    private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
+    
     private final Log log = LogFactory.getLog(getClass());
+    private static final String LOCAL = "local";
+    
+    private StringBuilder hodParams = null;
     
     protected PigContext pigContext;
     
     protected DataStorage ds;
-    protected HConfiguration conf;
     
     protected JobSubmissionProtocol jobTracker;
     protected JobClient jobClient;
@@ -85,10 +100,8 @@
     // map from LOGICAL key to into about the execution
     protected Map<OperatorKey, MapRedResult> materializedResults;
     
-    public HExecutionEngine(PigContext pigContext,
-                            HConfiguration conf) {
+    public HExecutionEngine(PigContext pigContext) {
         this.pigContext = pigContext;
-        this.conf = conf;
         this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
         this.physicalOpTable = new HashMap<OperatorKey, ExecPhysicalOperator>();
         this.materializedResults = new HashMap<OperatorKey, MapRedResult>();
@@ -108,10 +121,6 @@
         return this.materializedResults;
     }
     
-    public HExecutionEngine(PigContext pigContext) {
-        this(pigContext, new HConfiguration(new JobConf()));
-    }
-                            
     public Map<OperatorKey, ExecPhysicalOperator> getPhysicalOpTable() {
         return this.physicalOpTable;
     }
@@ -120,69 +129,85 @@
     public DataStorage getDataStorage() {
         return this.ds;
     }
-    
-    private void setJobtrackerLocation(String newLocation) {
-        conf.put("mapred.job.tracker", newLocation);
-    }
-
-    private void setFilesystemLocation(String newLocation) {
-        conf.put("fs.default.name", newLocation);
-    }
 
     public void init() throws ExecException {
+        init(this.pigContext.getProperties());
+    }
+    
+    public void init(Properties properties) throws ExecException {
         //First set the ssh socket factory
         setSSHFactory();
         
-        String hodServer = System.getProperty("hod.server");
+        String hodServer = properties.getProperty(HOD_SERVER);
+        String cluster = null;
+        String nameNode = null;
+        Configuration configuration = null;
     
         if (hodServer != null && hodServer.length() > 0) {
-            String hdfsAndMapred[] = doHod(hodServer);
-            setFilesystemLocation(hdfsAndMapred[0]);
-            setJobtrackerLocation(hdfsAndMapred[1]);
+            String hdfsAndMapred[] = doHod(hodServer, properties);
+            properties.setProperty(FILE_SYSTEM_LOCATION, hdfsAndMapred[0]);
+            properties.setProperty(JOB_TRACKER_LOCATION, hdfsAndMapred[1]);
         }
         else {
-            String cluster = System.getProperty("cluster");
+            
+            // We need to build a configuration object first in the manner described below
+            // and then get back a properties object to inspect the JOB_TRACKER_LOCATION
+            // and FILE_SYSTEM_LOCATION. The reason to do this is if we looked only at
+            // the existing properties object, we may not get the right settings. So we want
+            // to read the configurations in the order specified below and only then look
+            // for JOB_TRACKER_LOCATION and FILE_SYSTEM_LOCATION.
+            
+            // Hadoop by default specifies two resources, loaded in-order from the classpath:
+            // 1. hadoop-default.xml : Read-only defaults for hadoop.
+            // 2. hadoop-site.xml: Site-specific configuration for a given hadoop installation.
+            // Now add the settings from "properties" object to override any existing properties
+            // All of the above is accomplished in the method call below
+            configuration = ConfigurationUtil.toConfiguration(properties);            
+            properties = ConfigurationUtil.toProperties(configuration);
+            cluster = properties.getProperty(JOB_TRACKER_LOCATION);
+            nameNode = properties.getProperty(FILE_SYSTEM_LOCATION);
+            
             if (cluster != null && cluster.length() > 0) {
-                if(cluster.indexOf(':') < 0) {
+                if(!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
                     cluster = cluster + ":50020";
                 }
-                setJobtrackerLocation(cluster);
+                properties.setProperty(JOB_TRACKER_LOCATION, cluster);
             }
 
-            String nameNode = System.getProperty("namenode");
             if (nameNode!=null && nameNode.length() > 0) {
-                if(nameNode.indexOf(':') < 0) {
+                if(!nameNode.contains(":")  && !nameNode.equalsIgnoreCase(LOCAL)) {
                     nameNode = nameNode + ":8020";
                 }
-                setFilesystemLocation(nameNode);
+                properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
             }
         }
      
-        log.info("Connecting to hadoop file system at: " + conf.get("fs.default.name"));
-
-        try {
-            ds = new HDataStorage(conf);
-        }
-        catch (IOException e) {
-            throw new ExecException("Failed to create DataStorage", e);
-        }
-        
-        String jobTrackerName = conf.get("mapred.job.tracker").toString();
-        log.info("Connecting to map-reduce job tracker at: " + jobTrackerName);
+        log.info("Connecting to hadoop file system at: "  + (nameNode==null? LOCAL: nameNode) )  ;
+        ds = new HDataStorage(properties);
+                
+        // The above HDataStorage constructor sets DEFAULT_REPLICATION_FACTOR_KEY in properties.
+        // So we need to reconstruct the configuration object for the non HOD case
+        // In the HOD case, this is the first time the configuration object will be created
+        configuration = ConfigurationUtil.toConfiguration(properties);
         
-        try {
-            if(!jobTrackerName.equalsIgnoreCase("local"))
-                jobTracker = (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-                                                              JobSubmissionProtocol.versionID, 
-                                                              JobTracker.getAddress(conf.getConfiguration()),
-                                                              conf.getConfiguration());
-        }
-        catch (IOException e) {
-            throw new ExecException("Failed to crate job tracker", e);
+            
+        if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
+                log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
+                if (!LOCAL.equalsIgnoreCase(cluster)) {
+                try {
+                    jobTracker = (JobSubmissionProtocol) RPC.getProxy(
+                            JobSubmissionProtocol.class,
+                            JobSubmissionProtocol.versionID, JobTracker
+                                    .getAddress(configuration), configuration);
+                } catch (IOException e) {
+                    throw new ExecException("Failed to crate job tracker", e);
+                }
+            }
         }
 
         try {
-            jobClient = new JobClient(new JobConf(conf.getConfiguration()));
+            // Set job-specific configuration knobs
+            jobClient = new JobClient(new JobConf(configuration));
         }
         catch (IOException e) {
             throw new ExecException("Failed to create job client", e);
@@ -190,26 +215,16 @@
     }
 
     public void close() throws ExecException {
-        ;
+        closeHod(pigContext.getProperties().getProperty("hod.server"));
     }
         
     public Properties getConfiguration() throws ExecException {
-        return this.conf;
+        return this.pigContext.getProperties();
     }
         
     public void updateConfiguration(Properties newConfiguration) 
             throws ExecException {
-        Enumeration keys = newConfiguration.propertyNames();
-        
-        while (keys.hasMoreElements()) {
-            Object obj = keys.nextElement();
-            
-            if (obj instanceof String) {
-                String str = (String) obj;
-                
-                conf.put(str, newConfiguration.get(str));
-            }
-        }
+        init(newConfiguration);
     }
         
     public Map<String, Object> getStatistics() throws ExecException {
@@ -305,7 +320,8 @@
     }
     
     private void setSSHFactory(){
-        String g = System.getProperty("ssh.gateway");
+        Properties properties = this.pigContext.getProperties();
+        String g = properties.getProperty("ssh.gateway");
         if (g == null || g.length() == 0) return;
         try {
             Class clazz = Class.forName("org.apache.pig.shock.SSHSocketImplFactory");
@@ -321,136 +337,98 @@
     //To prevent doing hod if the pig server is constructed multiple times
     private static String hodMapRed;
     private static String hodHDFS;
+    private String hodConfDir = null; 
+    private String remoteHodConfDir = null; 
+    private Process hodProcess = null;
 
-    private enum ParsingState {
-        NOTHING, HDFSUI, MAPREDUI, HDFS, MAPRED, HADOOPCONF
-    };
+    class ShutdownThread extends Thread{
+        public synchronized void run() {
+            closeHod(pigContext.getProperties().getProperty("hod.server"));
+        }
+    }
     
-    private String[] doHod(String server) throws ExecException {
+    private String[] doHod(String server, Properties properties) throws ExecException {
         if (hodMapRed != null) {
             return new String[] {hodHDFS, hodMapRed};
         }
         
         try {
-            Process p = null;
-            // Make the kryptonite released version the default if nothing
-            // else is specified.
-            StringBuilder cmd = new StringBuilder();
-            cmd.append(System.getProperty("hod.expect.root"));
-            cmd.append('/');
-            cmd.append("libexec/pig/");
-            cmd.append(System.getProperty("hod.expect.uselatest"));
-            cmd.append('/');
-            cmd.append(System.getProperty("hod.command"));
-
-            String cluster = System.getProperty("yinst.cluster");
-           
-            // TODO This is a Yahoo specific holdover, need to remove
-            // this.
-            if (cluster != null && cluster.length() > 0 && !cluster.startsWith("kryptonite")) {
-                cmd.append(" --config=");
-                cmd.append(System.getProperty("hod.config.dir"));
-                cmd.append('/');
-                cmd.append(cluster);
-            }
+            // first, create temp director to store the configuration
+            hodConfDir = createTempDir(server);
+			
+            //jz: fallback to systemproperty cause this not handled in Main
+            hodParams = new StringBuilder(properties.getProperty(
+                    "hod.param", System.getProperty("hod.param", "")));
+            // get the number of nodes out of the command or use default
+            int nodes = getNumNodes(hodParams);
+
+            // command format: hod allocate - d <cluster_dir> -n <number_of_nodes> <other params>
+            String[] fixedCmdArray = new String[] { "hod", "allocate", "-d",
+                                       hodConfDir, "-n", Integer.toString(nodes) };
+            String[] extraParams = hodParams.toString().split(" ");
+    
+            String[] cmdarray = new String[fixedCmdArray.length + extraParams.length];
+            System.arraycopy(fixedCmdArray, 0, cmdarray, 0, fixedCmdArray.length);
+            System.arraycopy(extraParams, 0, cmdarray, fixedCmdArray.length, extraParams.length);
+
+            log.info("Connecting to HOD...");
+            log.debug("sending HOD command " + cmdToString(cmdarray));
 
-            cmd.append(" " + System.getProperty("hod.param", ""));
+            // setup shutdown hook to make sure we tear down hod connection
+            Runtime.getRuntime().addShutdownHook(new ShutdownThread());
 
-            if (server.equals("local")) {
-                p = Runtime.getRuntime().exec(cmd.toString());
-            } 
-            else {
-                SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
-                p = fac.ssh(cmd.toString());
-            }
-            
-            InputStream is = p.getInputStream();
+            hodProcess = runCommand(server, cmdarray);
 
-            log.info("Connecting to HOD...");
-            log.debug("sending HOD command " + cmd.toString());
+            // print all the information provided by HOD
+            try {
+                BufferedReader br = new BufferedReader(new InputStreamReader(hodProcess.getErrorStream()));
+                String msg;
+                while ((msg = br.readLine()) != null)
+                    log.info(msg);
+                br.close();
+            } catch(IOException ioe) {}
+
+            // for remote connection we need to bring the file locally  
+            if (!server.equals(LOCAL))
+                hodConfDir = copyHadoopConfLocally(server);
 
-            StringBuffer sb = new StringBuffer();
-            int c;
-            String hdfsUI = null;
-            String mapredUI = null;
             String hdfs = null;
             String mapred = null;
-            String hadoopConf = null;
+            String hadoopConf = hodConfDir + "/hadoop-site.xml";
 
-            ParsingState current = ParsingState.NOTHING;
+            log.info ("Hadoop configuration file: " + hadoopConf);
 
-            while((c = is.read()) != -1 && mapred == null) {
-                if (c == '\n' || c == '\r') {
-                    switch(current) {
-                    case HDFSUI:
-                        hdfsUI = sb.toString().trim();
-                        log.info("HDFS Web UI: " + hdfsUI);
-                        break;
-                    case HDFS:
-                        hdfs = sb.toString().trim();
-                        log.info("HDFS: " + hdfs);
-                        break;
-                    case MAPREDUI:
-                        mapredUI = sb.toString().trim();
-                        log.info("JobTracker Web UI: " + mapredUI);
-                        break;
-                    case MAPRED:
-                        mapred = sb.toString().trim();
-                        log.info("JobTracker: " + mapred);
-                        break;
-                    case HADOOPCONF:
-                        hadoopConf = sb.toString().trim();
-                        log.info("HadoopConf: " + hadoopConf);
-                        break;
-                    }
-                    current = ParsingState.NOTHING;
-                    sb = new StringBuffer();
-                }
-                sb.append((char)c);
-                if (sb.indexOf("hdfsUI:") != -1) {
-                    current = ParsingState.HDFSUI;
-                    sb = new StringBuffer();
-                } 
-                else if (sb.indexOf("hdfs:") != -1) {
-                    current = ParsingState.HDFS;
-                    sb = new StringBuffer();
-                } 
-                else if (sb.indexOf("mapredUI:") != -1) {
-                    current = ParsingState.MAPREDUI;
-                    sb = new StringBuffer();
-                } 
-                else if (sb.indexOf("mapred:") != -1) {
-                    current = ParsingState.MAPRED;
-                    sb = new StringBuffer();
-                } 
-                else if (sb.indexOf("hadoopConf:") != -1) {
-                    current = ParsingState.HADOOPCONF;
-                    sb = new StringBuffer();
-                }    
-            }
-            
-            hdfsUI = fixUpDomain(hdfsUI);
-            hdfs = fixUpDomain(hdfs);
-            mapredUI = fixUpDomain(mapredUI);
-            mapred = fixUpDomain(mapred);
+            JobConf jobConf = new JobConf(hadoopConf);
+            jobConf.addResource("pig-cluster-hadoop-site.xml");
+
+			// We need to load the properties from the hadoop configuration
+			// file we just found in the hod dir.  We want these to override
+			// any existing properties we have.
+        	if (jobConf != null) {
+            	Iterator<Map.Entry<String, String>> iter = jobConf.iterator();
+            	while (iter.hasNext()) {
+                	Map.Entry<String, String> entry = iter.next();
+                	properties.put(entry.getKey(), entry.getValue());
+            	}
+        	}
+
+            hdfs = properties.getProperty(FILE_SYSTEM_LOCATION);
+            if (hdfs == null)
+                throw new ExecException("Missing fs.default.name from hadoop configuration");
+            log.info("HDFS: " + hdfs);
+
+            mapred = properties.getProperty(JOB_TRACKER_LOCATION);
+            if (mapred == null)
+                throw new ExecException("Missing mapred.job.tracker from hadoop configuration");
+            log.info("JobTracker: " + mapred);
+
+            // this is not longer needed as hadoop-site.xml given to us by HOD
+            // contains data in the correct format
+            // hdfs = fixUpDomain(hdfs, properties);
+            // mapred = fixUpDomain(mapred, properties);
             hodHDFS = hdfs;
             hodMapRed = mapred;
 
-            if (hadoopConf != null) {
-                JobConf jobConf = new JobConf(hadoopConf);
-                jobConf.addResource("pig-cluster-hadoop-site.xml");
-                
-                conf = new HConfiguration(jobConf);
-                
-                // make sure that files on class path are used
-                System.out.println("Job Conf = " + conf);
-                System.out.println("dfs.block.size= " + conf.get("dfs.block.size"));
-                System.out.println("ipc.client.timeout= " + conf.get("ipc.client.timeout"));
-                System.out.println("mapred.child.java.opts= " + conf.get("mapred.child.java.opts"));
-            }
-            else {
-                throw new IOException("Missing Hadoop configuration file");
-            }
             return new String[] {hdfs, mapred};
         } 
         catch (Exception e) {
@@ -460,13 +438,117 @@
         }
     }
 
-    private String fixUpDomain(String hostPort) throws UnknownHostException {
-        String parts[] = hostPort.split(":");
-        if (parts[0].indexOf('.') == -1) {
-            parts[0] = parts[0] + ".inktomisearch.com";
+    private synchronized void closeHod(String server){
+            if (hodProcess == null)
+                return;
+
+            // hod deallocate format: hod deallocate -d <conf dir>
+            String[] cmdarray = new String[4];
+			cmdarray[0] = "hod";
+            cmdarray[1] = "deallocate";
+            cmdarray[2] = "-d";
+            if (remoteHodConfDir != null)
+                cmdarray[3] = remoteHodConfDir;
+            else
+                cmdarray[3] = hodConfDir;
+            
+            log.info("Disconnecting from HOD...");
+            log.debug("Disconnect command: " + cmdToString(cmdarray));
+
+            try {
+                Process p = runCommand(server, cmdarray);
+           } catch (Exception e) {
+                log.warn("Failed to disconnect from HOD; error: " + e.getMessage());
+           } finally {
+               if (remoteHodConfDir != null)
+                   deleteDir(server, remoteHodConfDir);
+               deleteDir(LOCAL, hodConfDir);
+           }
+
+           hodProcess = null;
+    }
+
+    private String copyHadoopConfLocally(String server) throws ExecException {
+        String localDir = createTempDir(LOCAL);
+        String remoteFile = new String(hodConfDir + "/hadoop-site.xml");
+        String localFile = new String(localDir + "/hadoop-site.xml");
+
+        remoteHodConfDir = hodConfDir;
+
+        String[] cmdarray = new String[2];
+        cmdarray[0] = "cat";
+        cmdarray[1] = remoteFile;
+
+        Process p = runCommand(server, cmdarray);
+
+        BufferedWriter bw;
+        try {
+            bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile)));
+        } catch (Exception e){
+            throw new ExecException("Failed to create local hadoop file " + localFile, e);
+        }
+
+        try {
+            BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
+            String line;
+            while ((line = br.readLine()) != null){
+                bw.write(line, 0, line.length());
+                bw.newLine();
+            }
+            br.close();
+            bw.close();
+        } catch (Exception e){
+            throw new ExecException("Failed to copy data to local hadoop file " + localFile, e);
+        }
+
+        return localDir;
+    }
+
+    private String cmdToString(String[] cmdarray) {
+        StringBuilder cmd = new StringBuilder();
+
+        for (int i = 0; i < cmdarray.length; i++) {
+            cmd.append(cmdarray[i]);
+            cmd.append(' ');
+        }
+
+        return cmd.toString();
+    }
+    private Process runCommand(String server, String[] cmdarray) throws ExecException {
+        Process p;
+        try {
+            if (server.equals(LOCAL)) {
+                p = Runtime.getRuntime().exec(cmdarray);
+            } 
+            else {
+                SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
+                p = fac.ssh(cmdToString(cmdarray));
+            }
+
+            //this should return as soon as connection is shutdown
+            int rc = p.waitFor();
+            if (rc != 0) {
+                String errMsg = new String();
+                try {
+                    BufferedReader br = new BufferedReader(new InputStreamReader(p.getErrorStream()));
+                    errMsg = br.readLine();
+                    br.close();
+                } catch (IOException ioe) {}
+                StringBuilder msg = new StringBuilder("Failed to run command ");
+                msg.append(cmdToString(cmdarray));
+                msg.append(" on server ");
+                msg.append(server);
+                msg.append("; return code: ");
+                msg.append(rc);
+                msg.append("; error: ");
+                msg.append(errMsg);
+                throw new ExecException(msg.toString());
+            }
+        } catch (Exception e){
+            throw new ExecException(e);
         }
-        InetAddress.getByName(parts[0]);
-        return parts[0] + ":" + parts[1];
+
+        return p;
     }
 
     private FileSpec checkLeafIsStore(PhysicalPlan plan) throws ExecException {
@@ -491,8 +573,145 @@
             throw new ExecException(e);
         }
     }
+    private void deleteDir(String server, String dir) {
+        if (server.equals(LOCAL)){
+            File path = new File(dir);
+            deleteLocalDir(path);
+        }
+        else { 
+            // send rm command over ssh
+            String[] cmdarray = new String[3];
+			cmdarray[0] = "rm";
+            cmdarray[1] = "-rf";
+            cmdarray[2] = dir;
+
+            try{
+                Process p = runCommand(server, cmdarray);
+            }catch(Exception e){
+                    log.warn("Failed to remove HOD configuration directory - " + dir);
+            }
+        }
+    }
+
+    private void deleteLocalDir(File path){
+        File[] files = path.listFiles();
+        int i;
+        for (i = 0; i < files.length; i++){
+            if (files[i].isHidden())
+                continue;
+            if (files[i].isFile())
+                files[i].delete();
+            else if (files[i].isDirectory())
+                deleteLocalDir(files[i]);
+        }
+
+        path.delete();
+    }
+
+    private String fixUpDomain(String hostPort,Properties properties) throws UnknownHostException {
+        URI uri = null;
+        try {
+            uri = new URI(hostPort);
+        } catch (URISyntaxException use) {
+            throw new RuntimeException("Illegal hostPort: " + hostPort);
+        }
+        
+        String hostname = uri.getHost();
+        int port = uri.getPort();
+        
+        // Parse manually if hostPort wasn't non-opaque URI
+        // e.g. hostPort is "myhost:myport"
+        if (hostname == null || port == -1) {
+            String parts[] = hostPort.split(":");
+            hostname = parts[0];
+            port = Integer.valueOf(parts[1]);
+        }
+        
+        if (hostname.indexOf('.') == -1) {
+          //jz: fallback to systemproperty cause this not handled in Main 
+            String domain = properties.getProperty("cluster.domain",System.getProperty("cluster.domain"));
+            if (domain == null) 
+                throw new RuntimeException("Missing cluster.domain property!");
+            hostname = hostname + "." + domain;
+        }
+        InetAddress.getByName(hostname);
+        return hostname + ":" + Integer.toString(port);
+    }
+
+    // create temp dir to store hod output; removed on exit
+    // format: <tempdir>/PigHod.<host name>.<user name>.<nanosecondts>
+    private String createTempDir(String server) throws ExecException {
+        StringBuilder tempDirPrefix  = new StringBuilder ();
+        
+        if (server.equals(LOCAL))
+            tempDirPrefix.append(System.getProperty("java.io.tmpdir"));
+        else
+            // for remote access we assume /tmp as temp dir
+            tempDirPrefix.append("/tmp");
+
+        tempDirPrefix.append("/PigHod.");
+        try {
+            tempDirPrefix.append(InetAddress.getLocalHost().getHostName());
+            tempDirPrefix.append(".");
+        } catch (UnknownHostException e) {}
+            
+        tempDirPrefix.append(System.getProperty("user.name"));
+        tempDirPrefix.append(".");
+        String path;
+        do {
+            path = tempDirPrefix.toString() + System.nanoTime();
+        } while (!createDir(server, path));
+
+        return path;
+    }
+
+    private boolean createDir(String server, String dir) throws ExecException{
+        if (server.equals(LOCAL)){ 
+            // create local directory
+            File tempDir = new File(dir);
+            boolean success = tempDir.mkdir();
+            if (!success)
+                log.warn("Failed to create HOD configuration directory - " + dir + ". Retrying ...");
+
+            return success;
+        }
+        else {
+            String[] cmdarray = new String[2];
+			cmdarray[0] = "mkdir ";
+            cmdarray[1] = dir;
+
+            try{
+                Process p = runCommand(server, cmdarray);
+            }
+            catch(ExecException e){
+                    log.warn("Failed to create HOD configuration directory - " + dir + "Retrying...");
+                    return false;
+            }
+
+            return true;
+        }
+    }
 
-   
+    // returns number of nodes based on -m option in hodParams if present;
+    // otherwise, default is used; -m is removed from the params
+    int getNumNodes(StringBuilder hodParams) {
+        String val = hodParams.toString();
+        int startPos = val.indexOf("-m ");
+        if (startPos == -1)
+            startPos = val.indexOf("-m\t");
+        if (startPos != -1) {
+            int curPos = startPos + 3;
+            int len = val.length();
+            while (curPos < len && Character.isWhitespace(val.charAt(curPos))) curPos ++;
+            int numStartPos = curPos;
+            while (curPos < len && Character.isDigit(val.charAt(curPos))) curPos ++;
+            int nodes = Integer.parseInt(val.substring(numStartPos, curPos));
+            hodParams.delete(startPos, curPos);
+            return nodes;
+        } else {
+            return Integer.getInteger("hod.nodes", 15);
+        }
+    }
 }
 
 

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Fri Jul 18 18:43:31 2008
@@ -94,13 +94,12 @@
     }
         
     public Properties getConfiguration() throws ExecException {
-        Properties conf = new Properties();
-        return conf;
+        return this.pigContext.getProperties();
     }
         
     public void updateConfiguration(Properties newConfiguration) 
         throws ExecException {
-        ;
+        // there is nothing to do here.
     }
         
     public Map<String, Object> getStatistics() throws ExecException {

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/PigContext.java Fri Jul 18 18:43:31 2008
@@ -63,7 +63,8 @@
     
     private transient final Log log = LogFactory.getLog(getClass());
     
-    private static final String JOB_NAME_PREFIX= "PigLatin";
+    public static final String JOB_NAME = "jobName";
+    public static final String JOB_NAME_PREFIX= "PigLatin";
     
     /* NOTE: we only serialize some of the stuff 
      * 
@@ -75,7 +76,7 @@
     private ExecType execType;;    
 
     //  configuration for connecting to hadoop
-    transient private Properties conf = new Properties();
+    private Properties conf = new Properties();
     
     //  extra jar files that are needed to run a job
     transient public List<URL> extraJars = new LinkedList<URL>();              
@@ -94,6 +95,8 @@
    
     private String jobName = JOB_NAME_PREFIX;    // can be overwritten by users
   
+    private Properties properties;
+    
     /**
      * a table mapping function names to function specs.
      */
@@ -101,16 +104,16 @@
     
     private static ArrayList<String> packageImportList = new ArrayList<String>();
 
-    public boolean                       debug       = true;
+    public boolean debug = true;
     
     public PigContext() {
-        this(ExecType.MAPREDUCE);
+        this(ExecType.MAPREDUCE, new Properties());
     }
         
-    public PigContext(ExecType execType){
+    public PigContext(ExecType execType, Properties properties){
         this.execType = execType;
+        this.properties = properties;   
 
-        initProperties();
         String pigJar = JarManager.findContainingJar(Main.class);
         String hadoopJar = JarManager.findContainingJar(FileSystem.class);
         if (pigJar != null) {
@@ -128,48 +131,14 @@
         packageImportList.add("com.yahoo.pig.yst.sds.ULT.");
         packageImportList.add("org.apache.pig.impl.builtin.");        
     }
-
-    private void initProperties() {
-        Properties fileProperties = new Properties();
-            
-        try{        
-            // first read the properties in the jar file
-            InputStream pis = MapReduceLauncher.class.getClassLoader().getResourceAsStream("properties");
-            if (pis != null) {
-                fileProperties.load(pis);
-            }
-            
-            //then read the properties in the home directory
-            try{
-                pis = new FileInputStream(System.getProperty("user.home") + "/.pigrc");
-            }catch(IOException e){}
-            if (pis != null) {
-                fileProperties.load(pis);
-            }
-        }catch (IOException e){
-            log.error(e);
-            throw new RuntimeException(e);
-        }
-        
-        //Now set these as system properties only if they are not already defined.
-        for (Object o: fileProperties.keySet()){
-            String propertyName = (String)o;
-            log.debug("Found system property " + propertyName + " in .pigrc"); 
-            if (System.getProperty(propertyName) == null){
-                System.setProperty(propertyName, fileProperties.getProperty(propertyName));
-                log.debug("Setting system property " + propertyName);
-            }
-        }
-    }    
     
     public void connect() throws ExecException {
-        try {
-            switch (execType) {
 
+        switch (execType) {
             case LOCAL:
             {
                 lfs = new HDataStorage(URI.create("file:///"),
-                                       new Configuration());
+                                       new Properties());
                 
                 dfs = lfs;
                 executionEngine = new LocalExecutionEngine(this);
@@ -185,7 +154,7 @@
                 dfs = executionEngine.getDataStorage();
                 
                 lfs = new HDataStorage(URI.create("file:///"),
-                        new Configuration());                
+                                        new Properties());                
             }
             break;
             
@@ -193,19 +162,8 @@
             {
                 throw new ExecException("Unkown execType: " + execType);
             }
-            }
         }
-        catch (IOException e) {
-            ;
-        }
-    }
-
-    public void setJobName(String name){
-    jobName = JOB_NAME_PREFIX + ":" + name;
-    }
 
-    public String getJobName(){
-    return jobName;
     }
 
     public void setJobtrackerLocation(String newLocation) {
@@ -278,7 +236,7 @@
             throw WrappedIOException.wrap("Unable to copy " + src + " to " + dst + (localDst ? "locally" : ""), e);
         }
         
-        srcElement.copy(dstElement, conf,false);
+        srcElement.copy(dstElement, this.properties, false);
     }
     
     public ExecutionEngine getExecutionEngine() {
@@ -293,8 +251,21 @@
         return lfs;
     }
 
+    /**
+     * Provides configuration information.
+     * 
+     * @return - information about the configuration used to connect to
+     *         execution engine
+     */
+    public Properties getProperties() {
+        return this.properties;
+    }
+    
+    /**
+     * @deprecated use {@link #getProperties()} instead
+     */
     public Properties getConf() {
-        return conf;
+        return getProperties();
     }
 
     /**

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Fri Jul 18 18:43:31 2008
@@ -29,6 +29,7 @@
 import java.util.Iterator;
 import java.util.Random;
 import java.util.Stack;
+import java.util.Properties ;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,6 +42,7 @@
 import org.apache.pig.backend.hadoop.datastorage.HDataStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.WrappedIOException;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 
 public class FileLocalizer {
     private static final Log log = LogFactory.getLog(FileLocalizer.class);
@@ -156,8 +158,8 @@
     }
     */
 
-    public static InputStream openDFSFile(String fileName, JobConf conf) throws IOException{
-        DataStorage dds = new HDataStorage(conf);
+    public static InputStream openDFSFile(String fileName, Properties properties) throws IOException{
+        DataStorage dds = new HDataStorage(properties);
         ElementDescriptor elem = dds.asElement(fileName);
         return openDFSFile(elem);
     }
@@ -318,20 +320,24 @@
             initialized = true;
             relativeRoot = pigContext.getDfs().asContainer("/tmp/temp" + r.nextInt());
             toDelete.push(relativeRoot);
-            Runtime.getRuntime().addShutdownHook(new Thread() {
-                @Override
-                public void run() {
-                    while (!toDelete.empty()) {
-                        try {
-                            ElementDescriptor elem = toDelete.pop();
-                            elem.delete();
-                        } 
-                        catch (IOException e) {
-                            log.error(e);
-                        }
-                    }
-                }
-            });
+            // Runtime.getRuntime().addShutdownHook(new Thread() {
+              //   @Override
+            //     public void run() {
+            //          deleteTempFiles();
+            //     }
+            //});
+        }
+    }
+
+    public static void deleteTempFiles() {
+        while (!toDelete.empty()) {
+            try {
+                ElementDescriptor elem = toDelete.pop();
+                elem.delete();
+            } 
+            catch (IOException e) {
+                log.error(e);
+            }
         }
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/MapReduceLauncher.java Fri Jul 18 18:43:31 2008
@@ -12,6 +12,7 @@
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.datastorage.HConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
 import org.apache.pig.impl.PigContext;
@@ -41,7 +42,7 @@
         comp.compile();
         
         ExecutionEngine exe = pc.getExecutionEngine();
-        Configuration conf = ((HConfiguration)exe.getConfiguration()).getConfiguration();
+        Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
         JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
 
         MROperPlan mrp = comp.getMRPlan();

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Fri Jul 18 18:43:31 2008
@@ -27,6 +27,7 @@
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataBag;
@@ -52,7 +53,7 @@
             throw new RuntimeException("Sort paritioner used but no quantiles found");
         
         try{
-            InputStream is = FileLocalizer.openDFSFile(quantilesFile,job);
+            InputStream is = FileLocalizer.openDFSFile(quantilesFile,ConfigurationUtil.toProperties(job));
             BinStorage loader = new BinStorage();
             loader.bindTo(quantilesFile, new BufferedPositionedInputStream(is), 0, Long.MAX_VALUE);
             

Added: incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java?rev=678090&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/util/ConfigurationValidator.java Fri Jul 18 18:43:31 2008
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.util;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class ConfigurationValidator {
+    
+    private final static Log log = LogFactory.getLog(PropertiesUtil.class);
+    /***
+     * All pig configurations should be validated in here before use
+     * @param properties
+     */
+    
+    public static void validatePigProperties(Properties properties) {
+        ensureLongType(properties, "pig.spill.size.threshold", 0L) ;
+        ensureLongType(properties, "pig.spill.gc.activation.size", Long.MAX_VALUE) ;   
+    }
+    
+    private static void ensureLongType(Properties properties,
+                                       String key, 
+                                       long defaultValue) {
+        String str = properties.getProperty(key) ;          
+        if (str != null) {
+            try {
+                Long.parseLong(str) ;
+            }
+            catch (NumberFormatException  nfe) {
+                log.error(str + " has to be parsable to long") ;
+                properties.setProperty(key, Long.toString(defaultValue)) ;
+            }
+        }
+        else {
+            properties.setProperty(key, Long.toString(defaultValue)) ;
+        }        
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Fri Jul 18 18:43:31 2008
@@ -110,7 +110,7 @@
         
         mDfs = mPigServer.getPigContext().getDfs();
         mLfs = mPigServer.getPigContext().getLfs();
-        mConf = mPigServer.getPigContext().getConf();
+        mConf = mPigServer.getPigContext().getProperties();
         
         // TODO: this violates the abstraction layer decoupling between
         // front end and back end and needs to be changed.

Modified: incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java Fri Jul 18 18:43:31 2008
@@ -18,6 +18,7 @@
 package org.apache.pig.test;
 
 import java.io.*;
+import java.util.Properties;
 
 import org.apache.hadoop.dfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
@@ -25,6 +26,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 
 /**
  * This class builds a single instance of itself with the Singleton 
@@ -98,5 +100,9 @@
         }
         if (m_dfs != null) { m_dfs.shutdown(); }
         if (m_mr != null) { m_mr.shutdown(); }        
+	}
+
+    public Properties getProperties() {
+        return ConfigurationUtil.toProperties(m_conf);
     }
 }

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEval.java Fri Jul 18 18:43:31 2008
@@ -24,8 +24,10 @@
 
 import junit.framework.TestCase;
 
+import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataByteArray;
@@ -34,12 +36,21 @@
 
 public class TestAlgebraicEval extends TestCase {
     
-    private String initString = "mapreduce";
+    private int LOOP_COUNT = 1024;
+
+
+    private PigServer pig;
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+
     MiniCluster cluster = MiniCluster.buildCluster();
     @Test
     public void testGroupCountWithMultipleFields() throws Throwable {
-        int LOOP_COUNT = 1024;
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -69,8 +80,6 @@
     
     @Test
     public void testSimpleCount() throws Exception {
-        long LOOP_COUNT = 1024;
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -89,8 +98,6 @@
 
     @Test
     public void testGroupCount() throws Throwable {
-        long LOOP_COUNT = 1024;
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -109,8 +116,6 @@
     
     @Test
     public void testGroupReorderCount() throws Throwable {
-        long LOOP_COUNT = 1024;
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -131,8 +136,6 @@
 
     @Test
     public void testGroupUniqueColumnCount() throws Throwable {
-        int LOOP_COUNT = 1024;
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         long groupsize = 0;
@@ -160,8 +163,6 @@
 
     @Test
     public void testGroupDuplicateColumnCount() throws Throwable {
-        int LOOP_COUNT = 1024;
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         long groupsize = 0;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java Fri Jul 18 18:43:31 2008
@@ -24,10 +24,12 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Properties;
 
 import org.junit.Test;
 import junit.framework.TestCase;
 
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
@@ -41,14 +43,14 @@
     @Test
     public void testOnCluster() throws Exception {
         // run the test on cluster        
-        runTest(new PigServer("mapreduce"));
+        runTest(new PigServer(ExecType.MAPREDUCE, cluster.getProperties()));
 
     }
 
     @Test
     public void testLocal() throws Exception {
         // run the test locally
-        runTest(new PigServer("local"));
+        runTest(new PigServer(ExecType.LOCAL, new Properties()));
     }
 
     

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestCompressedFiles.java Fri Jul 18 18:43:31 2008
@@ -28,6 +28,7 @@
 import org.junit.Test;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.DIFF;
 import junit.framework.TestCase;
@@ -72,7 +73,7 @@
     
     @Test
     public void testCompressed1() throws Throwable {
-        PigServer pig = new PigServer("mapreduce");
+        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pig.registerQuery("A = foreach (cogroup (load 'file:"+gzFile+"') by $1, (load 'file:"+datFile + "') by $1) generate flatten( " + DIFF.class.getName() + "($1.$1,$2.$1)) ;");
         Iterator it = pig.openIterator("A");
         boolean success = true;

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestEvalPipeline.java Fri Jul 18 18:43:31 2008
@@ -35,6 +35,7 @@
 import org.junit.Test;
 
 import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
@@ -48,14 +49,16 @@
 
 public class TestEvalPipeline extends TestCase {
     
-    String initString = "mapreduce";
     MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
 
     TupleFactory mTf = TupleFactory.getInstance();
     
     @Before
-    public void setUp(){
+    @Override
+    public void setUp() throws Exception{
         FileLocalizer.setR(new Random());
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
     }
     
     static public class MyBagFunction extends EvalFunc<DataBag>{
@@ -84,7 +87,6 @@
     
     @Test
     public void testFunctionInsideFunction() throws Exception{
-        PigServer pigServer = new PigServer(initString);
         
         File f1 = createFile(new String[]{"a:1","b:1","a:1"});
 
@@ -100,8 +102,7 @@
     
     @Test
     public void testJoin() throws Exception{
-        PigServer pigServer = new PigServer(initString);
-        
+                
         File f1 = createFile(new String[]{"a:1","b:1","a:1"});
         File f2 = createFile(new String[]{"b","b","a"});
         
@@ -122,7 +123,6 @@
     
     @Test
     public void testDriverMethod() throws Exception{
-        PigServer pigServer = new PigServer(initString);
         File f = File.createTempFile("tmp", "");
         PrintWriter pw = new PrintWriter(f);
         pw.println("a");
@@ -145,7 +145,6 @@
     
     @Test
     public void testMapLookup() throws Exception {
-        PigServer pigServer = new PigServer(initString);
         DataBag b = BagFactory.getInstance().newDefaultBag();
         Map<Object, Object> colors = new HashMap<Object, Object>();
         colors.put("apple","red");
@@ -246,7 +245,6 @@
     
     @Test
     public void testBagFunctionWithFlattening() throws Exception{
-        PigServer pigServer = new PigServer(initString);
         File queryLogFile = createFile(
                     new String[]{ 
                         "stanford\tdeer\tsighting",
@@ -312,18 +310,17 @@
         }
         ps.close(); 
         
-        PigServer pig = new PigServer(initString);
-        String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pig.getPigContext()).toString();
-        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        String tmpOutputFile = FileLocalizer.getTemporaryPath(null, pigServer.getPigContext()).toString();
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
         if (eliminateDuplicates){
-            pig.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
+            pigServer.registerQuery("B = DISTINCT (FOREACH A GENERATE $0) PARALLEL 10;");
         }else{
-            pig.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
+            pigServer.registerQuery("B = ORDER A BY $0 PARALLEL 10;");
         }
-        pig.store("B", tmpOutputFile);
+        pigServer.store("B", tmpOutputFile);
         
-        pig.registerQuery("A = load '" + tmpOutputFile + "';");
-        Iterator<Tuple> iter = pig.openIterator("A");
+        pigServer.registerQuery("A = load '" + tmpOutputFile + "';");
+        Iterator<Tuple> iter = pigServer.openIterator("A");
         String last = "";
         HashSet<Integer> seen = new HashSet<Integer>();
         if(!iter.hasNext()) fail("No Results obtained");
@@ -356,11 +353,10 @@
         }
         ps.close();
 
-        PigServer pig = new PigServer(initString);
         String tmpOutputFile = FileLocalizer.getTemporaryPath(null, 
-        pig.getPigContext()).toString();
-        pig.registerQuery("A = LOAD 'file:" + tmpFile + "';");
-        pig.registerQuery("B = group A by $0;");
+        pigServer.getPigContext()).toString();
+        pigServer.registerQuery("A = LOAD 'file:" + tmpFile + "';");
+        pigServer.registerQuery("B = group A by $0;");
         String query = "C = foreach B {"
         + "C1 = filter A by $0 > -1;"
         + "C2 = distinct C1;"
@@ -368,8 +364,8 @@
         + "generate (int)group," + Identity.class.getName() +"(*), COUNT(C2), SUM(C2.$1)," +  TitleNGrams.class.getName() + "(C3), MAX(C3.$1);"
         + "};";
 
-        pig.registerQuery(query);
-        Iterator<Tuple> iter = pig.openIterator("C");
+        pigServer.registerQuery(query);
+        Iterator<Tuple> iter = pigServer.openIterator("C");
         if(!iter.hasNext()) fail("No output found");
         int numIdentity = 0;
         while(iter.hasNext()){

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java?rev=678090&r1=678089&r2=678090&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestFilterOpNumeric.java Fri Jul 18 18:43:31 2008
@@ -22,10 +22,12 @@
 import java.io.PrintStream;
 import java.util.Iterator;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.Tuple;
@@ -39,10 +41,17 @@
     private static int LOOP_COUNT = 1024;
     private String initString = "mapreduce";
     MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pig;
     
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+
+
     @Test
     public void testNumericEq() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -73,7 +82,6 @@
 
     @Test
     public void testNumericNeq() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -100,7 +108,6 @@
 
     @Test
     public void testNumericGt() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -130,7 +137,6 @@
 
     @Test
     public void testBinCond() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -159,7 +165,6 @@
     
     @Test
     public void testNestedBinCond() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -185,7 +190,6 @@
     
     @Test 
     public void testNumericLt() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -215,7 +219,6 @@
     
     @Test
     public void testNumericGte() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {
@@ -245,7 +248,6 @@
 
     @Test
     public void testNumericLte() throws Throwable {
-        PigServer pig = new PigServer(initString);
         File tmpFile = File.createTempFile("test", "txt");
         PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
         for(int i = 0; i < LOOP_COUNT; i++) {