You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/10/30 01:28:42 UTC

svn commit: r831169 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plan...

Author: olga
Date: Fri Oct 30 00:28:41 2009
New Revision: 831169

URL: http://svn.apache.org/viewvc?rev=831169&view=rev
Log:
PIG-1059: FINDBUGS: remaining Bad practice + Multithreaded correctness Warning (olgan)

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
    hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
    hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
    hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
    hadoop/pig/trunk/src/org/apache/pig/data/TargetedTuple.java
    hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
    hadoop/pig/trunk/test/findbugsExcludeFile.xml

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Oct 30 00:28:41 2009
@@ -26,6 +26,8 @@
 
 IMPROVEMENTS
 
+PIG-1059: FINDBUGS: remaining Bad practice + Multithreaded correctness Warning (olgan)
+
 PIG-953: Enable merge join in pig to work with loaders and store functions
 which can internally index sorted data (pradeepkth)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/PigWarning.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigWarning.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigWarning.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigWarning.java Fri Oct 30 00:28:41 2009
@@ -60,5 +60,6 @@
     UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED,
     USING_OVERLOADED_FUNCTION,
     REDUCER_COUNT_LOW,
-    NULL_COUNTER_COUNT;
+    NULL_COUNTER_COUNT,
+    DELETE_FAILED;
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Oct 30 00:28:41 2009
@@ -80,15 +80,12 @@
 
 public class HExecutionEngine implements ExecutionEngine {
     
-    private static final String HOD_SERVER = "hod.server";
     public static final String JOB_TRACKER_LOCATION = "mapred.job.tracker";
     private static final String FILE_SYSTEM_LOCATION = "fs.default.name";
     
     private final Log log = LogFactory.getLog(getClass());
     private static final String LOCAL = "local";
     
-    private StringBuilder hodParams = null;
-    
     protected PigContext pigContext;
     
     protected DataStorage ds;
@@ -141,57 +138,48 @@
         //First set the ssh socket factory
         setSSHFactory();
         
-        String hodServer = properties.getProperty(HOD_SERVER);
         String cluster = null;
         String nameNode = null;
         Configuration configuration = null;
     
-        if (hodServer != null && hodServer.length() > 0) {
-            String hdfsAndMapred[] = doHod(hodServer, properties);
-            properties.setProperty(FILE_SYSTEM_LOCATION, hdfsAndMapred[0]);
-            properties.setProperty(JOB_TRACKER_LOCATION, hdfsAndMapred[1]);
-        }
-        else {
-            
-            // We need to build a configuration object first in the manner described below
-            // and then get back a properties object to inspect the JOB_TRACKER_LOCATION
-            // and FILE_SYSTEM_LOCATION. The reason to do this is if we looked only at
-            // the existing properties object, we may not get the right settings. So we want
-            // to read the configurations in the order specified below and only then look
-            // for JOB_TRACKER_LOCATION and FILE_SYSTEM_LOCATION.
+        // We need to build a configuration object first in the manner described below
+        // and then get back a properties object to inspect the JOB_TRACKER_LOCATION
+        // and FILE_SYSTEM_LOCATION. The reason to do this is if we looked only at
+        // the existing properties object, we may not get the right settings. So we want
+        // to read the configurations in the order specified below and only then look
+        // for JOB_TRACKER_LOCATION and FILE_SYSTEM_LOCATION.
             
-            // Hadoop by default specifies two resources, loaded in-order from the classpath:
-            // 1. hadoop-default.xml : Read-only defaults for hadoop.
-            // 2. hadoop-site.xml: Site-specific configuration for a given hadoop installation.
-            // Now add the settings from "properties" object to override any existing properties
-            // All of the above is accomplished in the method call below
+        // Hadoop by default specifies two resources, loaded in-order from the classpath:
+        // 1. hadoop-default.xml : Read-only defaults for hadoop.
+        // 2. hadoop-site.xml: Site-specific configuration for a given hadoop installation.
+        // Now add the settings from "properties" object to override any existing properties
+        // All of the above is accomplished in the method call below
            
-            JobConf jobConf = new JobConf();
-            jobConf.addResource("pig-cluster-hadoop-site.xml");
+        JobConf jobConf = new JobConf();
+        jobConf.addResource("pig-cluster-hadoop-site.xml");
             
-            //the method below alters the properties object by overriding the
-            //hadoop properties with the values from properties and recomputing
-            //the properties
-            recomputeProperties(jobConf, properties);
+        //the method below alters the properties object by overriding the
+        //hadoop properties with the values from properties and recomputing
+        //the properties
+        recomputeProperties(jobConf, properties);
             
-            configuration = ConfigurationUtil.toConfiguration(properties);            
-            properties = ConfigurationUtil.toProperties(configuration);
-            cluster = properties.getProperty(JOB_TRACKER_LOCATION);
-            nameNode = properties.getProperty(FILE_SYSTEM_LOCATION);
+        configuration = ConfigurationUtil.toConfiguration(properties);            
+        properties = ConfigurationUtil.toProperties(configuration);
+        cluster = properties.getProperty(JOB_TRACKER_LOCATION);
+        nameNode = properties.getProperty(FILE_SYSTEM_LOCATION);
             
-            if (cluster != null && cluster.length() > 0) {
-                if(!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
-                    cluster = cluster + ":50020";
-                }
-                properties.setProperty(JOB_TRACKER_LOCATION, cluster);
+        if (cluster != null && cluster.length() > 0) {
+            if(!cluster.contains(":") && !cluster.equalsIgnoreCase(LOCAL)) {
+                cluster = cluster + ":50020";
             }
+            properties.setProperty(JOB_TRACKER_LOCATION, cluster);
+        }
 
-            if (nameNode!=null && nameNode.length() > 0) {
-                if(!nameNode.contains(":")  && !nameNode.equalsIgnoreCase(LOCAL)) {
-                    nameNode = nameNode + ":8020";
-                }
-                properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
+        if (nameNode!=null && nameNode.length() > 0) {
+            if(!nameNode.contains(":")  && !nameNode.equalsIgnoreCase(LOCAL)) {
+                nameNode = nameNode + ":8020";
             }
+            properties.setProperty(FILE_SYSTEM_LOCATION, nameNode);
         }
      
         log.info("Connecting to hadoop file system at: "  + (nameNode==null? LOCAL: nameNode) )  ;
@@ -218,10 +206,6 @@
         }
     }
 
-    public void close() throws ExecException {
-        closeHod(pigContext.getProperties().getProperty("hod.server"));
-    }
-        
     public Properties getConfiguration() throws ExecException {
         return this.pigContext.getProperties();
     }
@@ -231,6 +215,8 @@
         init(newConfiguration);
     }
         
+    public void close() throws ExecException {}
+
     public Map<String, Object> getStatistics() throws ExecException {
         throw new UnsupportedOperationException();
     }
@@ -335,409 +321,6 @@
         }
     }
 
-    //To prevent doing hod if the pig server is constructed multiple times
-    private static String hodMapRed;
-    private static String hodHDFS;
-    private String hodConfDir = null; 
-    private String remoteHodConfDir = null; 
-    private Process hodProcess = null;
-
-    class ShutdownThread extends Thread{
-        public synchronized void run() {
-            closeHod(pigContext.getProperties().getProperty("hod.server"));
-        }
-    }
-    
-    private String[] doHod(String server, Properties properties) throws ExecException {
-        if (hodMapRed != null) {
-            return new String[] {hodHDFS, hodMapRed};
-        }
-        
-            // first, create temp director to store the configuration
-            hodConfDir = createTempDir(server);
-			
-            //jz: fallback to systemproperty cause this not handled in Main
-            hodParams = new StringBuilder(properties.getProperty(
-                    "hod.param", System.getProperty("hod.param", "")));
-            // get the number of nodes out of the command or use default
-            int nodes = getNumNodes(hodParams);
-
-            // command format: hod allocate - d <cluster_dir> -n <number_of_nodes> <other params>
-            String[] fixedCmdArray = new String[] { "hod", "allocate", "-d",
-                                       hodConfDir, "-n", Integer.toString(nodes) };
-            String[] extraParams = hodParams.toString().split(" ");
-    
-            String[] cmdarray = new String[fixedCmdArray.length + extraParams.length];
-            System.arraycopy(fixedCmdArray, 0, cmdarray, 0, fixedCmdArray.length);
-            System.arraycopy(extraParams, 0, cmdarray, fixedCmdArray.length, extraParams.length);
-
-            log.info("Connecting to HOD...");
-            log.debug("sending HOD command " + cmdToString(cmdarray));
-
-            // setup shutdown hook to make sure we tear down hod connection
-            Runtime.getRuntime().addShutdownHook(new ShutdownThread());
-
-            runCommand(server, cmdarray, true);
-
-            // print all the information provided by HOD
-            try {
-                BufferedReader br = new BufferedReader(new InputStreamReader(hodProcess.getErrorStream()));
-                String msg;
-                while ((msg = br.readLine()) != null)
-                    log.info(msg);
-                br.close();
-            } catch(IOException ioe) {}
-
-            // for remote connection we need to bring the file locally  
-            if (!server.equals(LOCAL))
-                hodConfDir = copyHadoopConfLocally(server);
-
-            String hdfs = null;
-            String mapred = null;
-            String hadoopConf = hodConfDir + "/hadoop-site.xml";
-
-            log.info ("Hadoop configuration file: " + hadoopConf);
-
-            JobConf jobConf = new JobConf(hadoopConf);
-            jobConf.addResource("pig-cluster-hadoop-site.xml");
-
-            //the method below alters the properties object by overriding the
-            //hod properties with the values from properties and recomputing
-            //the properties
-            recomputeProperties(jobConf, properties);
-            
-            hdfs = properties.getProperty(FILE_SYSTEM_LOCATION);
-            if (hdfs == null) {
-                int errCode = 4007;
-                String msg = "Missing fs.default.name from hadoop configuration.";
-                throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
-            }
-            log.info("HDFS: " + hdfs);
-
-            mapred = properties.getProperty(JOB_TRACKER_LOCATION);
-            if (mapred == null) {
-                int errCode = 4007;
-                String msg = "Missing mapred.job.tracker from hadoop configuration";
-                throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT);
-            }
-            log.info("JobTracker: " + mapred);
-
-            // this is not longer needed as hadoop-site.xml given to us by HOD
-            // contains data in the correct format
-            // hdfs = fixUpDomain(hdfs, properties);
-            // mapred = fixUpDomain(mapred, properties);
-            hodHDFS = hdfs;
-            hodMapRed = mapred;
-
-            return new String[] {hdfs, mapred};
-    }
-
-    private synchronized void closeHod(String server){
-            if (hodProcess == null){
-                // just cleanup the dir if it exists and return
-                if (hodConfDir != null)
-                    deleteDir(server, hodConfDir);
-                return;
-            }
-
-            // hod deallocate format: hod deallocate -d <conf dir>
-            String[] cmdarray = new String[4];
-			cmdarray[0] = "hod";
-            cmdarray[1] = "deallocate";
-            cmdarray[2] = "-d";
-            if (remoteHodConfDir != null)
-                cmdarray[3] = remoteHodConfDir;
-            else
-                cmdarray[3] = hodConfDir;
-            
-            log.info("Disconnecting from HOD...");
-            log.debug("Disconnect command: " + cmdToString(cmdarray));
-
-            try {
-                runCommand(server, cmdarray, false);
-           } catch (Exception e) {
-                log.warn("Failed to disconnect from HOD; error: " + e.getMessage());
-                hodProcess.destroy();
-           } finally {
-               if (remoteHodConfDir != null){
-                   deleteDir(server, remoteHodConfDir);
-                   if (hodConfDir != null)
-                       deleteDir(LOCAL, hodConfDir);
-               }else
-                   deleteDir(server, hodConfDir);
-           }
-
-           hodProcess = null;
-    }
-
-    private String copyHadoopConfLocally(String server) throws ExecException {
-        String localDir = createTempDir(LOCAL);
-        String remoteFile = hodConfDir + "/hadoop-site.xml";
-        String localFile = localDir + "/hadoop-site.xml";
-
-        remoteHodConfDir = hodConfDir;
-
-        String[] cmdarray = new String[2];
-        cmdarray[0] = "cat";
-        cmdarray[1] = remoteFile;
-
-        Process p = runCommand(server, cmdarray, false);
-
-        BufferedWriter bw;
-        try {
-            bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(localFile)));
-        } catch (Exception e){
-            int errCode = 4008;
-            String msg = "Failed to create local hadoop file " + localFile;
-            throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT, e);
-        }
-
-        try {
-            BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
-            String line;
-            while ((line = br.readLine()) != null){
-                bw.write(line, 0, line.length());
-                bw.newLine();
-            }
-            br.close();
-            bw.close();
-        } catch (Exception e){
-            int errCode = 4009;
-            String msg = "Failed to copy data to local hadoop file " + localFile;
-            throw new ExecException(msg, errCode, PigException.USER_ENVIRONMENT, e);
-        }
-
-        return localDir;
-    }
-
-    private String cmdToString(String[] cmdarray) {
-        StringBuilder cmd = new StringBuilder();
-
-        for (int i = 0; i < cmdarray.length; i++) {
-            cmd.append(cmdarray[i]);
-            cmd.append(' ');
-        }
-
-        return cmd.toString();
-    }
-    private Process runCommand(String server, String[] cmdarray, boolean connect) throws ExecException {
-        Process p;
-        try {
-            if (server.equals(LOCAL)) {
-                p = Runtime.getRuntime().exec(cmdarray);
-            } 
-            else {
-                SSHSocketImplFactory fac = SSHSocketImplFactory.getFactory(server);
-                p = fac.ssh(cmdToString(cmdarray));
-            }
-
-            if (connect)
-                hodProcess = p;
-
-            //this should return as soon as connection is shutdown
-            int rc = p.waitFor();
-            if (rc != 0) {
-                StringBuilder errMsg = new StringBuilder();
-                try {
-                    BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
-                    String line = null;
-                    while((line = br.readLine()) != null) {
-                        errMsg.append(line);
-                    }
-                    br.close();
-                    br = new BufferedReader(new InputStreamReader(p.getErrorStream()));
-                    line = null;
-                    while((line = br.readLine()) != null) {
-                        errMsg.append(line);
-                    }
-                    br.close();
-                } catch (IOException ioe) {}
-                int errCode = 6011;
-                StringBuilder msg = new StringBuilder("Failed to run command ");
-                msg.append(cmdToString(cmdarray));
-                msg.append(" on server ");
-                msg.append(server);
-                msg.append("; return code: ");
-                msg.append(rc);
-                msg.append("; error: ");
-                msg.append(errMsg.toString());
-                throw new ExecException(msg.toString(), errCode, PigException.REMOTE_ENVIRONMENT);
-            }
-        } catch (Exception e){
-            if(e instanceof ExecException) throw (ExecException)e;
-            int errCode = 6012;
-            String msg = "Unable to run command: " + cmdToString(cmdarray) + " on server " + server;
-            throw new ExecException(msg, errCode, PigException.REMOTE_ENVIRONMENT, e);
-        }
-
-        return p;
-    }
-
-    /*
-    private FileSpec checkLeafIsStore(PhysicalPlan plan) throws ExecException {
-        try {
-            PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
-            FileSpec spec = null;
-            if(!(leaf instanceof POStore)){
-                String scope = leaf.getOperatorKey().getScope();
-                POStore str = new POStore(new OperatorKey(scope,
-                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
-                str.setPc(pigContext);
-                spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
-                    pigContext).toString(),
-                    new FuncSpec(BinStorage.class.getName()));
-                str.setSFile(spec);
-                plan.addAsLeaf(str);
-            } else{
-                spec = ((POStore)leaf).getSFile();
-            }
-            return spec;
-        } catch (Exception e) {
-            throw new ExecException(e);
-        }
-    }
-    */
-
-    private void deleteDir(String server, String dir) {
-        if (server.equals(LOCAL)){
-            File path = new File(dir);
-            deleteLocalDir(path);
-        }
-        else { 
-            // send rm command over ssh
-            String[] cmdarray = new String[3];
-			cmdarray[0] = "rm";
-            cmdarray[1] = "-rf";
-            cmdarray[2] = dir;
-
-            try{
-                runCommand(server, cmdarray, false);
-            }catch(Exception e){
-                    log.warn("Failed to remove HOD configuration directory - " + dir);
-            }
-        }
-    }
-
-    private void deleteLocalDir(File path){
-        File[] files = path.listFiles();
-        int i;
-        for (i = 0; i < files.length; i++){
-            if (files[i].isHidden())
-                continue;
-            if (files[i].isFile())
-                files[i].delete();
-            else if (files[i].isDirectory())
-                deleteLocalDir(files[i]);
-        }
-
-        path.delete();
-    }
-
-    private String fixUpDomain(String hostPort,Properties properties) throws UnknownHostException {
-        URI uri = null;
-        try {
-            uri = new URI(hostPort);
-        } catch (URISyntaxException use) {
-            throw new RuntimeException("Illegal hostPort: " + hostPort);
-        }
-        
-        String hostname = uri.getHost();
-        int port = uri.getPort();
-        
-        // Parse manually if hostPort wasn't non-opaque URI
-        // e.g. hostPort is "myhost:myport"
-        if (hostname == null || port == -1) {
-            String parts[] = hostPort.split(":");
-            hostname = parts[0];
-            port = Integer.valueOf(parts[1]);
-        }
-        
-        if (hostname.indexOf('.') == -1) {
-          //jz: fallback to systemproperty cause this not handled in Main 
-            String domain = properties.getProperty("cluster.domain",System.getProperty("cluster.domain"));
-            if (domain == null) 
-                throw new RuntimeException("Missing cluster.domain property!");
-            hostname = hostname + "." + domain;
-        }
-        InetAddress.getByName(hostname);
-        return hostname + ":" + Integer.toString(port);
-    }
-
-    // create temp dir to store hod output; removed on exit
-    // format: <tempdir>/PigHod.<host name>.<user name>.<nanosecondts>
-    private String createTempDir(String server) throws ExecException {
-        StringBuilder tempDirPrefix  = new StringBuilder ();
-        
-        if (server.equals(LOCAL))
-            tempDirPrefix.append(System.getProperty("java.io.tmpdir"));
-        else
-            // for remote access we assume /tmp as temp dir
-            tempDirPrefix.append("/tmp");
-
-        tempDirPrefix.append("/PigHod.");
-        try {
-            tempDirPrefix.append(InetAddress.getLocalHost().getHostName());
-            tempDirPrefix.append(".");
-        } catch (UnknownHostException e) {}
-            
-        tempDirPrefix.append(System.getProperty("user.name"));
-        tempDirPrefix.append(".");
-        String path;
-        do {
-            path = tempDirPrefix.toString() + System.nanoTime();
-        } while (!createDir(server, path));
-
-        return path;
-    }
-
-    private boolean createDir(String server, String dir) throws ExecException{
-        if (server.equals(LOCAL)){ 
-            // create local directory
-            File tempDir = new File(dir);
-            boolean success = tempDir.mkdir();
-            if (!success)
-                log.warn("Failed to create HOD configuration directory - " + dir + ". Retrying ...");
-
-            return success;
-        }
-        else {
-            String[] cmdarray = new String[2];
-			cmdarray[0] = "mkdir ";
-            cmdarray[1] = dir;
-
-            try{
-                runCommand(server, cmdarray, false);
-            }
-            catch(ExecException e){
-                    log.warn("Failed to create HOD configuration directory - " + dir + "Retrying...");
-                    return false;
-            }
-
-            return true;
-        }
-    }
-
-    // returns number of nodes based on -m option in hodParams if present;
-    // otherwise, default is used; -m is removed from the params
-    int getNumNodes(StringBuilder hodParams) {
-        String val = hodParams.toString();
-        int startPos = val.indexOf("-m ");
-        if (startPos == -1)
-            startPos = val.indexOf("-m\t");
-        if (startPos != -1) {
-            int curPos = startPos + 3;
-            int len = val.length();
-            while (curPos < len && Character.isWhitespace(val.charAt(curPos))) curPos ++;
-            int numStartPos = curPos;
-            while (curPos < len && Character.isDigit(val.charAt(curPos))) curPos ++;
-            int nodes = Integer.parseInt(val.substring(numStartPos, curPos));
-            hodParams.delete(startPos, curPos);
-            return nodes;
-        } else {
-            return Integer.getInteger("hod.nodes", 15);
-        }
-    }
-    
     /**
      * Method to recompute pig properties by overriding hadoop properties
      * with pig properties
@@ -763,20 +346,7 @@
                 hadoopProperties.put(key, val);
             }
             
-            //clear user defined properties and re-populate
-            properties.clear();
-            Enumeration<Object> hodPropertiesIter = hadoopProperties.keys();
-            while (hodPropertiesIter.hasMoreElements()) {
-                String key = (String) hodPropertiesIter.nextElement();
-                String val = hadoopProperties.getProperty(key);
-                properties.put(key, val);
-            }
-
         }
     }
     
 }
-
-
-
-

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Fri Oct 30 00:28:41 2009
@@ -273,16 +273,6 @@
     }
 
     @Override
-    public void visitCogroup(POCogroup cogroup) {
-        cogroup.setParentPlan(parent);
-    }
-
-    @Override
-    public void visitSplit(org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit split) {
-        split.setParentPlan(parent);
-    }
-
-    @Override
     public void visitLocalRearrangeForIllustrate(
             POLocalRearrangeForIllustrate lrfi) throws VisitorException {
         super.visitLocalRearrangeForIllustrate(lrfi);

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Fri Oct 30 00:28:41 2009
@@ -204,12 +204,6 @@
     }
 
     @Override
-    public boolean equals(Object obj) {
-        // TODO Auto-generated method stub
-        return super.equals(obj);
-    }
-
-    @Override
     public PhysicalPlan clone() throws CloneNotSupportedException {
         PhysicalPlan clone = new PhysicalPlan();
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java Fri Oct 30 00:28:41 2009
@@ -26,12 +26,15 @@
 import java.util.HashMap;
 import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.apache.pig.backend.datastorage.SeekableInputStream;
 
 public abstract class LocalPath implements ElementDescriptor {
 
+    private Log log = LogFactory.getLog(getClass());
     protected DataStorage fs;
     protected File path;
 
@@ -121,7 +124,9 @@
     }
 
     public void delete() throws IOException {
-        getCurPath().delete();
+        boolean res = getCurPath().delete();
+        if (!res)
+            log.warn("LocalPath.delete: failed to delete" + getCurPath());
     }
 
     public Properties getConfiguration() throws IOException {

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Fri Oct 30 00:28:41 2009
@@ -268,10 +268,6 @@
 	    return result;
 	}
 	
-	public boolean equals(Object obj) {
-	    return this.equals(obj);
-	}
-	
     }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinStorage.java Fri Oct 30 00:28:41 2009
@@ -405,6 +405,10 @@
         return true;
     }
 
+    public int hashCode() {
+        return 42; 
+    }
+
     /* (non-Javadoc)
      * @see org.apache.pig.StoreFunc#getStorePreparationClass()
      */

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/BinaryStorage.java Fri Oct 30 00:28:41 2009
@@ -143,6 +143,10 @@
         return true;
     }
 
+    public int hashCode() {
+        return 42; 
+    }
+
     /* (non-Javadoc)
      * @see org.apache.pig.LoadFunc#determineSchema(java.lang.String, org.apache.pig.ExecType, org.apache.pig.backend.datastorage.DataStorage)
      */

Modified: hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/builtin/PigStorage.java Fri Oct 30 00:28:41 2009
@@ -324,13 +324,20 @@
     }
     
     public boolean equals(Object obj) {
-        return equals((PigStorage)obj);
+        if (obj instanceof PigStorage)
+            return equals((PigStorage)obj);
+        else
+            return false;
     }
 
     public boolean equals(PigStorage other) {
         return this.fieldDel == other.fieldDel;
     }
 
+    public int hashCode() {
+        return (int)fieldDel;
+    }
+
     /* (non-Javadoc)
      * @see org.apache.pig.StoreFunc#getStorePreparationClass()
      */

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DefaultAbstractBag.java Fri Oct 30 00:28:41 2009
@@ -29,6 +29,7 @@
 import java.util.ArrayList;
 
 import org.apache.pig.PigException;
+import org.apache.pig.PigWarning;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
@@ -161,7 +162,9 @@
             mContents.clear();
             if (mSpillFiles != null) {
                 for (int i = 0; i < mSpillFiles.size(); i++) {
-                    mSpillFiles.get(i).delete();
+                    boolean res = mSpillFiles.get(i).delete();
+                    if (!res)
+                        warn ("DefaultAbstractBag.clear: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);  
                 }
                 mSpillFiles.clear();
             }
@@ -298,7 +301,10 @@
     protected void finalize() {
         if (mSpillFiles != null) {
             for (int i = 0; i < mSpillFiles.size(); i++) {
-                mSpillFiles.get(i).delete();
+                boolean res = mSpillFiles.get(i).delete();
+                if (!res)
+                    warn ("DefaultAbstractBag.finalize: failed to delete " + mSpillFiles.get(i), PigWarning.DELETE_FAILED, null);
+                    
             }
         }
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/DistinctDataBag.java Fri Oct 30 00:28:41 2009
@@ -203,6 +203,17 @@
             public int compareTo(TContainer other) {
                 return tuple.compareTo(other.tuple);
             }
+
+            public boolean equals(Object obj){
+                if (obj instanceof TContainer)
+                    return tuple.equals(((TContainer)obj).tuple);
+                else
+                    return false;
+            }
+
+            public int hashCode() {
+                return tuple.hashCode(); 
+            }
         }
 
         // We have to buffer a tuple because there's no easy way for next

Modified: hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/InternalSortedBag.java Fri Oct 30 00:28:41 2009
@@ -83,6 +83,9 @@
         	return (o == this);
         }
 
+        public int hashCode() {
+            return 42; 
+        }
     }
     
     public InternalSortedBag() {

Modified: hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/NonSpillableDataBag.java Fri Oct 30 00:28:41 2009
@@ -192,6 +192,10 @@
         return compareTo(obj) == 0;
     }
 
+    public int hashCode() {
+        return mContents.hashCode();
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public int compareTo(Object other) {

Modified: hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SingleTupleBag.java Fri Oct 30 00:28:41 2009
@@ -157,6 +157,15 @@
         return 0;
     }
 
+    public boolean equals(Object o){
+        // TODO: match to compareTo if it is updated
+        return true;
+    }
+
+    public int hashCode() {
+        return 42; 
+    }
+
     class TBIterator implements Iterator<Tuple> {
         boolean nextDone = false;
         /* (non-Javadoc)

Modified: hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/SortedDataBag.java Fri Oct 30 00:28:41 2009
@@ -71,6 +71,10 @@
             return false;
         }
 
+        public int hashCode() {
+            return 42; 
+        }
+
     }
 
     /**
@@ -177,6 +181,17 @@
             public int compareTo(PQContainer other) {
                 return mComp.compare(tuple, other.tuple);
             }
+
+            public boolean equals(Object other) {
+                if (other instanceof PQContainer)
+                    return tuple.equals(((PQContainer)other).tuple);
+                else
+                    return false;
+            }
+
+            public int hashCode() {
+                return tuple.hashCode(); 
+            }
         }
 
         // We have to buffer a tuple because there's no easy way for next

Modified: hadoop/pig/trunk/src/org/apache/pig/data/TargetedTuple.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/data/TargetedTuple.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/data/TargetedTuple.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/data/TargetedTuple.java Fri Oct 30 00:28:41 2009
@@ -151,6 +151,15 @@
         return t.compareTo(o);
     }
     
+    @SuppressWarnings("unchecked")
+    public boolean equals(Object o) {
+        return t.equals(o);
+    }
+
+    public int hashCode() {
+        return t.hashCode();
+    }
+    
     /**
      * @return true if this Tuple is null
      */

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Fri Oct 30 00:28:41 2009
@@ -407,7 +407,9 @@
             // TODO probably this should be replaced with the local file system
             File f = (new File(fileSpec)).getParentFile();
             if (f!=null){
-                f.mkdirs();
+                boolean res = f.mkdirs();
+                if (!res)
+                    log.warn("FileLocalizer.create: failed to create " + f);
             }
             
             return new FileOutputStream(fileSpec,append);

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Fri Oct 30 00:28:41 2009
@@ -502,10 +502,16 @@
         }
         
         public boolean equals(Object obj) {
-          HandleSpec other = (HandleSpec)obj;
-          return (other != null && name.equals(other.name) && spec.equals(other.spec));
+            if (obj instanceof HandleSpec){
+                HandleSpec other = (HandleSpec)obj;
+                return (other != null && name.equals(other.name) && spec.equals(other.spec));
+            } else 
+                return false;
         }
 
+        public int hashCode() {
+            return name.hashCode();
+        }
 
         public Object clone() {
           try {

Modified: hadoop/pig/trunk/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/findbugsExcludeFile.xml?rev=831169&r1=831168&r2=831169&view=diff
==============================================================================
--- hadoop/pig/trunk/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/trunk/test/findbugsExcludeFile.xml Fri Oct 30 00:28:41 2009
@@ -37,6 +37,9 @@
         <Bug pattern="EI_EXPOSE_REP2" />
     </Match>
     <Match>
+        <Bug pattern="DP_CREATE_CLASSLOADER_INSIDE_DO_PRIVILEGED" />
+    </Match>
+    <Match>
         <Class name="org.apache.pig.tools.parameters.Token" />
     </Match>
     <Match>
@@ -146,6 +149,10 @@
         <Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
     </Match>
     <Match>
+        <Class name="org.apache.pig.data.InternalDistinctBag$DistinctDataBagIterator$TContainer" />
+        <Bug pattern="SIC_INNER_SHOULD_BE_STATIC" />
+    </Match>
+    <Match>
         <Bug pattern="BC_BAD_CAST_TO_CONCRETE_COLLECTION" />
     </Match>
     <!-- This Tuple classes are not used -->
@@ -236,5 +243,13 @@
         <Method name = "sendMTFValues" />
         <Bug pattern="IM_BAD_CHECK_FOR_ODD" />
     </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger" />
+        <Bug pattern="UG_SYNC_SET_UNSYNC_GET" />
+    </Match>
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream" />
+        <Bug pattern="IS2_INCONSISTENT_SYNC" />
+    </Match>
     
 </FindBugsFilter>