You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/04/30 16:00:20 UTC

svn commit: r1591297 - in /pig/trunk: ./ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengi...

Author: rohini
Date: Wed Apr 30 14:00:20 2014
New Revision: 1591297

URL: http://svn.apache.org/r1591297
Log:
PIG-3672: Pig should not check for hardcoded file system implementations (rohini)

Added:
    pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
    pig/trunk/src/org/apache/pig/impl/util/UriUtil.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java
    pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
    pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/trunk/test/org/apache/pig/test/TestMRJobStats.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Apr 30 14:00:20 2014
@@ -32,6 +32,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3672: Pig should not check for hardcoded file system implementations (rohini)
+
 PIG-3860: Refactor PigStatusReporter and PigLogger for non-MR execution engine (cheolsoo)
 
 PIG-3865: Remodel the XMLLoader to work to be faster and more maintainable (aseldawy via daijy)

Modified: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Apr 30 14:00:20 2014
@@ -119,14 +119,33 @@ public class HadoopShims {
     public static void unsetConf(Configuration conf, String key) {
         // Not supported in Hadoop 0.20/1.x
     }
-    
+
     /**
-     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop 
+     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
      * @param conf
      * @param taskAttemptID
      */
     public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
         conf.set("mapred.task.id", taskAttemptID.toString());
     }
-    
+
+    /**
+     * Returns whether the give path has a FileSystem implementation.
+     *
+     * @param path path
+     * @param conf configuration
+     * @return true if the give path's scheme has a FileSystem implementation,
+     *         false otherwise
+     */
+    public static boolean hasFileSystemImpl(Path path, Configuration conf) {
+        String scheme = path.toUri().getScheme();
+        if (scheme != null) {
+            String fsImpl = conf.get("fs." + scheme + ".impl");
+            if (fsImpl == null) {
+                return false;
+            }
+        }
+        return true;
+    }
+
 }

Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Wed Apr 30 14:00:20 2014
@@ -18,7 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.shims;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -41,6 +44,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop23.PigJobControl;
 
 public class HadoopShims {
+
+    private static Log LOG = LogFactory.getLog(HadoopShims.class);
+    private static Method getFileSystemClass;
+
     static public JobContext cloneJobContext(JobContext original) throws IOException, InterruptedException {
         JobContext newContext = ContextFactory.cloneContext(original,
                 new JobConf(original.getConfiguration()));
@@ -103,7 +110,7 @@ public class HadoopShims {
     public static JobControl newJobControl(String groupName, int timeToSleep) {
       return new PigJobControl(groupName, timeToSleep);
     }
-    
+
     public static long getDefaultBlockSize(FileSystem fs, Path path) {
         return fs.getDefaultBlockSize(path);
     }
@@ -111,22 +118,63 @@ public class HadoopShims {
     public static Counters getCounters(Job job) throws IOException, InterruptedException {
         return new Counters(job.getJob().getCounters());
     }
-    
+
     public static boolean isJobFailed(TaskReport report) {
         return report.getCurrentStatus()==TIPStatus.FAILED;
     }
-    
+
     public static void unsetConf(Configuration conf, String key) {
         conf.unset(key);
     }
-    
+
     /**
-     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop 
+     * Fetch mode needs to explicitly set the task id which is otherwise done by Hadoop
      * @param conf
      * @param taskAttemptID
      */
     public static void setTaskAttemptId(Configuration conf, TaskAttemptID taskAttemptID) {
         conf.setInt("mapreduce.job.application.attempt.id", taskAttemptID.getId());
     }
-    
+
+    /**
+     * Returns whether the give path has a FileSystem implementation.
+     *
+     * @param path path
+     * @param conf configuration
+     * @return true if the give path's scheme has a FileSystem implementation,
+     *         false otherwise
+     */
+    public static boolean hasFileSystemImpl(Path path, Configuration conf) {
+        String scheme = path.toUri().getScheme();
+        if (scheme != null) {
+            // Hadoop 0.23
+            if (conf.get("fs.file.impl") != null) {
+                String fsImpl = conf.get("fs." + scheme + ".impl");
+                if (fsImpl == null) {
+                    return false;
+                }
+            } else {
+                // Hadoop 2.x HADOOP-7549
+                if (getFileSystemClass == null) {
+                    try {
+                        getFileSystemClass = FileSystem.class.getDeclaredMethod(
+                                "getFileSystemClass", String.class, Configuration.class);
+                    } catch (NoSuchMethodException e) {
+                        LOG.warn("Error while trying to determine if path " + path +
+                                " has a filesystem implementation");
+                        // Assume has implementation to be safe
+                        return true;
+                    }
+                }
+                try {
+                    Object fs = getFileSystemClass.invoke(null, scheme, conf);
+                    return fs == null ? false : true;
+                } catch (Exception e) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Wed Apr 30 14:00:20 2014
@@ -127,7 +127,7 @@ public class PigServer {
     public static final String PRETTY_PRINT_SCHEMA_PROPERTY = "pig.pretty.print.schema";
     private static final String PIG_LOCATION_CHECK_STRICT = "pig.location.check.strict";
 
-    /*    
+    /*
      * The data structure to support grunt shell operations.
      * The grunt shell can only work on one graph at a time.
      * If a script is contained inside another script, the grunt
@@ -176,10 +176,10 @@ public class PigServer {
     /**
      * @param execTypeString can be 'mapreduce' or 'local'.  Local mode will
      * use Hadoop's local job runner to execute the job on the local machine.
-     * Mapreduce mode will connect to a cluster to execute the job. If 
+     * Mapreduce mode will connect to a cluster to execute the job. If
      * execTypeString is not one of these two, Pig will deduce the ExecutionEngine
      * if it is on the classpath and use it for the backend execution.
-     * @throws ExecException 
+     * @throws ExecException
      * @throws IOException
      */
     public PigServer(String execTypeString) throws ExecException, IOException {
@@ -293,7 +293,7 @@ public class PigServer {
 
     /**
      * Current DAG
-     * 
+     *
      * @return
      */
     public Graph getCurrentDAG() {
@@ -368,7 +368,7 @@ public class PigServer {
      * should be followed by {@link PigServer#executeBatch(boolean)} with
      * argument as false. Do Not use {@link PigServer#executeBatch()} after
      * calling this method as that will re-parse and build the script.
-     * 
+     *
      * @throws IOException
      */
     public void parseAndBuild() throws IOException {
@@ -383,7 +383,7 @@ public class PigServer {
 
     /**
      * Submits a batch of Pig commands for execution.
-     * 
+     *
      * @return list of jobs being executed
      * @throws IOException
      */
@@ -395,7 +395,7 @@ public class PigServer {
      * Submits a batch of Pig commands for execution. Parse and build of script
      * should be skipped if user called {@link PigServer#parseAndBuild()}
      * before. Pass false as an argument in which case.
-     * 
+     *
      * @param parseAndBuild
      * @return
      * @throws IOException
@@ -1049,7 +1049,7 @@ public class PigServer {
      * @param lps Stream to print the logical tree
      * @param eps Stream to print the ExecutionEngine trees. If null, then will print to files
      * @param dir Directory to print ExecutionEngine trees. If null, will use eps
-     * @param suffix Suffix of file names 
+     * @param suffix Suffix of file names
      * @throws IOException if the requested alias cannot be found.
      */
     public void explain(String alias,
@@ -1063,7 +1063,7 @@ public class PigServer {
         try {
             pigContext.inExplain = true;
             buildStorePlan( alias );
-            
+
             //Only add root xml node if all plans are being written to same stream.
             if (format == "xml" && lps == eps) {
                 lps.println("<plan>");
@@ -1083,7 +1083,7 @@ public class PigServer {
             if (format.equals("xml") && lps == eps) {
                 lps.println("</plan>");
             }
-            
+
             if (markAsExecute) {
                 currDAG.markAsExecuted();
             }
@@ -1408,13 +1408,13 @@ public class PigServer {
         }
         return op;
     }
-    
+
     /**
      * Returns data associated with LogicalPlan. It makes
      * sense to call this method only after a query/script
      * has been registered with one of the {@link #registerQuery(String)}
      * or {@link #registerScript(InputStream)} methods.
-     * 
+     *
      * @return LogicalPlanData
      */
     public LogicalPlanData getLogicalPlanData() {
@@ -1730,7 +1730,7 @@ public class PigServer {
         private void compile(LogicalPlan lp) throws FrontendException  {
             DanglingNestedNodeRemover DanglingNestedNodeRemover = new DanglingNestedNodeRemover( lp );
             DanglingNestedNodeRemover.visit();
-            
+
             new ColumnAliasConversionVisitor(lp).visit();
             new SchemaAliasVisitor(lp).visit();
             new ScalarVisitor(lp, pigContext, scope).visit();
@@ -1813,14 +1813,14 @@ public class PigServer {
         /**
          * This method checks whether the multiple sinks (STORE) use the same
          * "file-based" location. If yes, throws a RuntimeException
-         * 
+         *
          * @param storeOps
          */
         private void checkDuplicateStoreLoc(Set<LOStore> storeOps) {
             Set<String> uniqueStoreLoc = new HashSet<String>();
             for(LOStore store : storeOps) {
                 String fileName = store.getFileSpec().getFileName();
-                if(!uniqueStoreLoc.add(fileName) && UriUtil.isHDFSFileOrLocalOrS3N(fileName)) {
+                if(!uniqueStoreLoc.add(fileName) && UriUtil.isHDFSFileOrLocalOrS3N(fileName, new Configuration(true))) {
                     throw new RuntimeException("Script contains 2 or more STORE statements writing to same location : "+ fileName);
                 }
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/FileBasedOutputSizeReader.java Wed Apr 30 14:00:20 2014
@@ -35,15 +35,15 @@ public class FileBasedOutputSizeReader i
 
     private static final Log log = LogFactory.getLog(FileBasedOutputSizeReader.class);
 
-    /** 
+    /**
      * Returns whether the given POStore is supported by this output size reader
      * or not. We check whether the uri scheme of output file is one of hdfs,
      * local, and s3.
      * @param sto POStore
      */
     @Override
-    public boolean supports(POStore sto) {
-        return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto));
+    public boolean supports(POStore sto, Configuration conf) {
+        return UriUtil.isHDFSFileOrLocalOrS3N(getLocationUri(sto), conf);
     }
 
     /**
@@ -53,7 +53,7 @@ public class FileBasedOutputSizeReader i
      */
     @Override
     public long getOutputSize(POStore sto, Configuration conf) throws IOException {
-        if (!supports(sto)) {
+        if (!supports(sto, conf)) {
             log.warn("'" + sto.getStoreFunc().getClass().getName()
                     + "' is not supported by " + getClass().getName());
             return -1;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/InputSizeReducerEstimator.java Wed Apr 30 14:00:20 2014
@@ -109,7 +109,7 @@ public class InputSizeReducerEstimator i
                 // the input file location might be a list of comma separated files,
                 // separate them out
                 for (String location : LoadFunc.getPathStrings(ld.getLFile().getFileName())) {
-                    if (UriUtil.isHDFSFileOrLocalOrS3N(location)) {
+                    if (UriUtil.isHDFSFileOrLocalOrS3N(location, conf)) {
                         Path path = new Path(location);
                         FileSystem fs = path.getFileSystem(conf);
                         FileStatus[] status = fs.globStatus(path);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Wed Apr 30 14:00:20 2014
@@ -96,6 +96,7 @@ public class MapReduceLauncher extends L
 
     private boolean aggregateWarning = false;
 
+    @Override
     public void kill() {
         try {
             log.debug("Receive kill signal");
@@ -112,6 +113,7 @@ public class MapReduceLauncher extends L
         }
     }
 
+    @Override
     public void killJob(String jobID, Configuration conf) throws BackendException {
         try {
             if (conf != null) {
@@ -440,7 +442,7 @@ public class MapReduceLauncher extends L
             // Clean up all the intermediate data
             for (String path : intermediateVisitor.getIntermediate()) {
                 // Skip non-file system paths such as hbase, see PIG-3617
-                if (Utils.hasFileSystemImpl(new Path(path), conf)) {
+                if (HadoopShims.hasFileSystemImpl(new Path(path), conf)) {
                     FileLocalizer.delete(path, pc);
                 }
             }
@@ -728,7 +730,7 @@ public class MapReduceLauncher extends L
         if(shouldMarkOutputDir(job)) {
             Path outputPath = new Path(store.getSFile().getFileName());
             String scheme = outputPath.toUri().getScheme();
-            if (Utils.hasFileSystemImpl(outputPath, job.getJobConf())) {
+            if (HadoopShims.hasFileSystemImpl(outputPath, job.getJobConf())) {
                 FileSystem fs = outputPath.getFileSystem(job.getJobConf());
                 if (fs.exists(outputPath)) {
                     // create a file in the folder to mark it

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigStatsOutputSizeReader.java Wed Apr 30 14:00:20 2014
@@ -30,7 +30,7 @@ import org.apache.pig.classification.Int
  * HBaseStorage), the output size cannot always be computed as the total size of
  * output files.
  *
- * @see FileBasedOutputSizeReader 
+ * @see FileBasedOutputSizeReader
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -38,12 +38,13 @@ public interface PigStatsOutputSizeReade
 
     static final String OUTPUT_SIZE_READER_KEY = "pig.stats.output.size.reader";
 
-    /** 
+    /**
      * Returns whether the given PSStore is supported by this output size reader
      * or not.
      * @param sto POStore
+     * @param conf Configuration
      */
-    public boolean supports(POStore sto);
+    public boolean supports(POStore sto, Configuration conf);
 
     /**
      * Returns the size of output in bytes. If the size of output cannot be

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Wed Apr 30 14:00:20 2014
@@ -46,6 +46,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -67,7 +68,7 @@ import org.apache.pig.impl.util.Utils;
 public class MapRedUtil {
 
     private static Log log = LogFactory.getLog(MapRedUtil.class);
-         
+
     public static final String FILE_SYSTEM_NAME = "fs.default.name";
 
     /**
@@ -85,8 +86,8 @@ public class MapRedUtil {
         Map<E, Pair<Integer, Integer>> reducerMap = new HashMap<E, Pair<Integer, Integer>>();
 
         // use local file system to get the keyDistFile
-        Configuration conf = new Configuration(false);            
-        
+        Configuration conf = new Configuration(false);
+
         if (mapConf.get("yarn.resourcemanager.principal")!=null) {
             conf.set("yarn.resourcemanager.principal", mapConf.get("yarn.resourcemanager.principal"));
         }
@@ -121,7 +122,7 @@ public class MapRedUtil {
             Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
             // Used to replace the maxIndex with the number of reducers
             if (maxIndex < minIndex) {
-                maxIndex = totalReducers[0] + maxIndex; 
+                maxIndex = totalReducers[0] + maxIndex;
             }
             E keyT;
 
@@ -131,7 +132,7 @@ public class MapRedUtil {
                 // it in the reducer map
                 Tuple keyTuple = TupleFactory.getInstance().newTuple();
                 for (int i=0; i < idxTuple.size() - 2; i++) {
-                    keyTuple.append(idxTuple.get(i));	
+                    keyTuple.append(idxTuple.get(i));
                 }
                 keyT = (E) keyTuple;
             } else {
@@ -169,12 +170,12 @@ public class MapRedUtil {
     public static void setupUDFContext(Configuration job) throws IOException {
         UDFContext udfc = UDFContext.getUDFContext();
         udfc.addJobConf(job);
-        // don't deserialize in front-end 
+        // don't deserialize in front-end
         if (udfc.isUDFConfEmpty()) {
             udfc.deserialize();
         }
     }
-    
+
     /**
      * Sets up output and log dir paths for a single-store streaming job
      *
@@ -187,7 +188,7 @@ public class MapRedUtil {
             Configuration conf) throws IOException {
         // set out filespecs
         String outputPathString = st.getSFile().getFileName();
-        if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
+        if (HadoopShims.hasFileSystemImpl(new Path(outputPathString), conf)) {
             conf.set("pig.streaming.log.dir",
                     new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
         }
@@ -245,11 +246,11 @@ public class MapRedUtil {
 
     /**
      * Get all files recursively from the given list of files
-     * 
+     *
      * @param files a list of FileStatus
      * @param conf the configuration object
      * @return the list of fileStatus that contains all the files in the given
-     *         list and, recursively, all the files inside the directories in 
+     *         list and, recursively, all the files inside the directories in
      *         the given list
      * @throws IOException
      */
@@ -267,12 +268,12 @@ public class MapRedUtil {
                 result.add(file);
             }
         }
-        log.info("Total input paths to process : " + result.size()); 
+        log.info("Total input paths to process : " + result.size());
         return result;
     }
-    
+
     private static void addInputPathRecursively(List<FileStatus> result,
-            FileSystem fs, Path path, PathFilter inputFilter) 
+            FileSystem fs, Path path, PathFilter inputFilter)
             throws IOException {
         for (FileStatus stat: fs.listStatus(path, inputFilter)) {
             if (stat.isDir()) {
@@ -284,6 +285,7 @@ public class MapRedUtil {
     }
 
     private static final PathFilter hiddenFileFilter = new PathFilter(){
+        @Override
         public boolean accept(Path p){
             String name = p.getName();
             return !name.startsWith("_") && !name.startsWith(".");
@@ -319,7 +321,7 @@ public class MapRedUtil {
             return cmp == 0 ? 0 : cmp < 0 ? -1 : 1;
         }
     };
-    
+
     private static final class ComparableSplit implements Comparable<ComparableSplit> {
         private InputSplit rawInputSplit;
         private HashSet<Node> nodes;
@@ -330,32 +332,32 @@ public class MapRedUtil {
             nodes = new HashSet<Node>();
             this.id = id;
         }
-        
+
         void add(Node node) {
             nodes.add(node);
         }
-        
+
         void removeFromNodes() {
             for (Node node : nodes)
                 node.remove(this);
         }
-        
+
         public InputSplit getSplit() {
             return rawInputSplit;
         }
-  
+
         @Override
         public boolean equals(Object other) {
             if (other == null || !(other instanceof ComparableSplit))
                 return false;
             return (compareTo((ComparableSplit) other) == 0);
         }
-        
+
         @Override
         public int hashCode() {
             return 41;
         }
-        
+
         @Override
         public int compareTo(ComparableSplit other) {
             try {
@@ -369,41 +371,41 @@ public class MapRedUtil {
             }
         }
     }
-      
+
     private static class DummySplit extends InputSplit {
         private long length;
-        
+
         @Override
         public String[] getLocations() {
             return null;
         }
-        
+
         @Override
         public long getLength() {
             return length;
         }
-        
+
         public void setLength(long length) {
             this.length = length;
         }
     }
-    
+
     private static class Node {
         private long length = 0;
         private ArrayList<ComparableSplit> splits;
         private boolean sorted;
-        
+
         Node() throws IOException, InterruptedException {
             length = 0;
             splits = new ArrayList<ComparableSplit>();
             sorted = false;
         }
-        
+
         void add(ComparableSplit split) throws IOException, InterruptedException {
             splits.add(split);
             length++;
         }
-        
+
         void remove(ComparableSplit split) {
             if (!sorted)
                 sort();
@@ -413,23 +415,23 @@ public class MapRedUtil {
                 length--;
             }
         }
-        
+
         void sort() {
             if (!sorted) {
                 Collections.sort(splits);
                 sorted = true;
             }
         }
-        
+
         ArrayList<ComparableSplit> getSplits() {
             return splits;
         }
-  
+
         public long getLength() {
             return length;
         }
     }
-  
+
     public static List<List<InputSplit>> getCombinePigSplits(List<InputSplit>
         oneInputSplits, long maxCombinedSplitSize, Configuration conf)
           throws IOException, InterruptedException {
@@ -438,13 +440,13 @@ public class MapRedUtil {
         List<List<InputSplit>> result = new ArrayList<List<InputSplit>>();
         List<Long> resultLengths = new ArrayList<Long>();
         long comparableSplitId = 0;
-        
+
         int size = 0, nSplits = oneInputSplits.size();
         InputSplit lastSplit = null;
         int emptyCnt = 0;
         for (InputSplit split : oneInputSplits) {
             if (split.getLength() == 0) {
-                emptyCnt++; 
+                emptyCnt++;
                 continue;
             }
             if (split.getLength() >= maxCombinedSplitSize) {
@@ -461,7 +463,7 @@ public class MapRedUtil {
                 HashSet<String> locationSeen = new HashSet<String>();
                 for (String location : locations)
                 {
-                    if (!locationSeen.contains(location)) 
+                    if (!locationSeen.contains(location))
                     {
                         Node node = nodeMap.get(location);
                         if (node == null) {
@@ -488,7 +490,7 @@ public class MapRedUtil {
               ArrayList<ComparableSplit> splits = node.getSplits();
               for (ComparableSplit split : splits) {
                 if (!seen.contains(split.getSplit())) {
-                  // remove duplicates. The set has to be on the raw input split not the 
+                  // remove duplicates. The set has to be on the raw input split not the
                   // comparable input split as the latter overrides the compareTo method
                   // so its equality semantics is changed and not we want here
                   seen.add(split.getSplit());
@@ -497,7 +499,7 @@ public class MapRedUtil {
               }
             }
           }
-          
+
           int combinedSplitLen = 0;
           for (PigSplit split : result)
             combinedSplitLen += split.getNumPaths();
@@ -581,7 +583,7 @@ public class MapRedUtil {
             for (Node node : nodes) {
                 for (ComparableSplit split : node.getSplits()) {
                     if (!seen.contains(split.getSplit())) {
-                        // remove duplicates. The set has to be on the raw input split not the 
+                        // remove duplicates. The set has to be on the raw input split not the
                         // comparable input split as the latter overrides the compareTo method
                         // so its equality semantics is changed and not we want here
                         seen.add(split.getSplit());
@@ -589,7 +591,7 @@ public class MapRedUtil {
                     }
                 }
             }
-            
+
             /* verification code
             int combinedSplitLen = 0;
             for (PigSplit split : result)
@@ -602,7 +604,7 @@ public class MapRedUtil {
                 long totalSize = 0;
                 ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
                 ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
-                
+
                 int splitLen = leftoverSplits.size();
                 for (int i = 0; i < splitLen; i++)
                 {
@@ -649,26 +651,26 @@ public class MapRedUtil {
           combinedSplitLen += split.getNumPaths();
         if (combinedSplitLen != nSplits-emptyCnt)
           throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"].");
-        
+
         long totalLen = 0;
         for (PigSplit split : result)
           totalLen += split.getLength();
-        
+
         long origTotalLen = 0;
         for (InputSplit split : oneInputSplits)
           origTotalLen += split.getLength();
         if (totalLen != origTotalLen)
           throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]");
-        */ 
+        */
         log.info("Total input paths (combined) to process : " + result.size());
         return result;
     }
-    
+
     private static void removeSplits(List<ComparableSplit> splits) {
         for (ComparableSplit split: splits)
             split.removeFromNodes();
     }
-    
+
     public String inputSplitToString(InputSplit[] splits) throws IOException, InterruptedException {
         // debugging purpose only
         StringBuilder st = new StringBuilder();
@@ -681,11 +683,11 @@ public class MapRedUtil {
             st.append("Input split["+i+"]:\n   Length = "+ splits[i].getLength()+"\n  Locations:\n");
             for (String location :  splits[i].getLocations())
                 st.append("    "+location+"\n");
-            st.append("\n-----------------------\n"); 
+            st.append("\n-----------------------\n");
         }
         return st.toString();
     }
-    
+
     /* verification code: debug purpose only
     public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException {
       StringBuilder st = new StringBuilder();
@@ -698,7 +700,7 @@ public class MapRedUtil {
         st.append("Input split["+i+"]:\n   Length = "+ splits.get(i).getSplit().getLength()+"\n  Locations:\n");
         for (String location :  splits.get(i).getSplit().getLocations())
           st.append("    "+location+"\n");
-        st.append("\n-----------------------\n"); 
+        st.append("\n-----------------------\n");
       }
       return st.toString();
     }

Modified: pig/trunk/src/org/apache/pig/impl/util/UriUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/UriUtil.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/UriUtil.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/UriUtil.java Wed Apr 30 14:00:20 2014
@@ -17,24 +17,25 @@
  */
 package org.apache.pig.impl.util;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+
 public class UriUtil {
     public static boolean isHDFSFile(String uri){
         if(uri == null)
             return false;
-        if(uri.startsWith("/") || uri.startsWith("hdfs:") || uri.startsWith("viewfs:")) {
+        if (uri.startsWith("/") || uri.startsWith("hdfs:") || uri.startsWith("viewfs:") ||
+                uri.startsWith("hftp:") || uri.startsWith("webhdfs:")) {
             return true;
         }
         return false;
     }
 
-    public static boolean isHDFSFileOrLocalOrS3N(String uri){
+    public static boolean isHDFSFileOrLocalOrS3N(String uri, Configuration conf){
         if(uri == null)
             return false;
-        if(uri.startsWith("/") || uri.matches("[A-Za-z]:.*") || uri.startsWith("hdfs:")
-                || uri.startsWith("viewfs:") || uri.startsWith("file:") || uri.startsWith("s3n:")) {
-            return true;
-        }
-        return false;
+        return HadoopShims.hasFileSystemImpl(new Path(uri), conf);
     }
-    
+
 }

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Wed Apr 30 14:00:20 2014
@@ -525,27 +525,6 @@ public class Utils {
         return baos.toString();
     }
 
-    /**
-     * Returns whether the give path has a FileSystem implementation.
-     *
-     * @param path path
-     * @param conf configuration
-     * @return true if the give path's scheme has a FileSystem implementation,
-     *         false otherwise
-     */
-    public static boolean hasFileSystemImpl(Path path, Configuration conf) {
-        String scheme = path.toUri().getScheme();
-        if (scheme != null) {
-            String fsImpl = conf.get("fs." + scheme + ".impl");
-            if (fsImpl == null) {
-                return false;
-            }
-        }
-        return true;
-    }
-    
-
-
     public static boolean isLocal(PigContext pigContext, Configuration conf) {
         return pigContext.getExecType().isLocal() || conf.getBoolean(PigImplConstants.CONVERTED_TO_LOCAL, false);
     }

Modified: pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParserUtils.java Wed Apr 30 14:00:20 2014
@@ -31,6 +31,7 @@ import org.antlr.runtime.CommonTokenStre
 import org.antlr.runtime.RecognitionException;
 import org.antlr.runtime.tree.CommonTree;
 import org.antlr.runtime.tree.Tree;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigConfiguration;
@@ -38,6 +39,8 @@ import org.apache.pig.StoreFuncInterface
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -56,13 +59,13 @@ public class QueryParserUtils {
             return str;
     }
 
-    public static void attachStorePlan(String scope, LogicalPlan lp, String fileName, String func, 
+    public static void attachStorePlan(String scope, LogicalPlan lp, String fileName, String func,
             Operator input, String alias, PigContext pigContext) throws FrontendException {
         func = func == null ? pigContext.getProperties().getProperty(PigConfiguration.PIG_DEFAULT_STORE_FUNC, PigStorage.class.getName()) : func;
 
         FuncSpec funcSpec = new FuncSpec( func );
         StoreFuncInterface stoFunc = (StoreFuncInterface)PigContext.instantiateFuncFromSpec( funcSpec );
-        
+
         fileName = removeQuotes( fileName );
         FileSpec fileSpec = new FileSpec( fileName, funcSpec );
         String sig = alias + "_" + LogicalPlanBuilder.newOperatorKey(scope);
@@ -87,25 +90,23 @@ public class QueryParserUtils {
         ElementDescriptor el = dfs.asElement(desc);
         return new Path(el.toString());
     }
-    
+
     static void setHdfsServers(String absolutePath, PigContext pigContext) throws URISyntaxException {
         // Get native host
         String defaultFS = (String)pigContext.getProperties().get("fs.default.name");
         if (defaultFS==null)
             defaultFS = (String)pigContext.getProperties().get("fs.defaultFS");
-        
+
         URI defaultFSURI = new URI(defaultFS);
-        String defaultHost = defaultFSURI.getHost();
-        if (defaultHost == null) defaultHost = "";
-                
-        defaultHost = defaultHost.toLowerCase();
-    
-        Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultHost);
-                    
+
+        Configuration conf = new Configuration(true);
+        ConfigurationUtil.mergeConf(conf, ConfigurationUtil.toConfiguration(pigContext.getProperties()));
+        Set<String> remoteHosts = getRemoteHosts(absolutePath, defaultFSURI, conf);
+
         String hdfsServersString = (String)pigContext.getProperties().get("mapreduce.job.hdfs-servers");
         if (hdfsServersString == null) hdfsServersString = "";
         String hdfsServers[] = hdfsServersString.split(",");
-                    
+
         for (String remoteHost : remoteHosts) {
             boolean existing = false;
             for (String hdfsServer : hdfsServers) {
@@ -120,43 +121,50 @@ public class QueryParserUtils {
                 hdfsServersString = hdfsServersString + remoteHost;
             }
         }
-    
+
         if (!hdfsServersString.isEmpty()) {
             pigContext.getProperties().setProperty("mapreduce.job.hdfs-servers", hdfsServersString);
         }
     }
 
-     static Set<String> getRemoteHosts(String absolutePath, String defaultHost) {
-         String HAR_PREFIX = "hdfs-";
-         Set<String> result = new HashSet<String>();
-         String[] fnames = absolutePath.split(",");
-         for (String fname: fnames) {
-             // remove leading/trailing whitespace(s)
-             fname = fname.trim();
-             Path p = new Path(fname);
-             URI uri = p.toUri();
-             if(uri.isAbsolute()) {
-                 String scheme = uri.getScheme();
-                 if (scheme!=null && scheme.toLowerCase().equals("hdfs")||scheme.toLowerCase().equals("har")) {
-                     if (uri.getHost()==null)
-                         continue;
-                     String thisHost = uri.getHost().toLowerCase();
-                     if (scheme.toLowerCase().equals("har")) {
-                         if (thisHost.startsWith(HAR_PREFIX)) {
-                             thisHost = thisHost.substring(HAR_PREFIX.length());
-                         }
-                     }
-                     if (!uri.getHost().isEmpty() && 
-                             !thisHost.equals(defaultHost)) {
-                         if (uri.getPort()!=-1)
-                             result.add("hdfs://"+thisHost+":"+uri.getPort());
-                         else
-                             result.add("hdfs://"+thisHost);
-                     }
-                 }
-             }
-         }
-         return result;
+    static Set<String> getRemoteHosts(String absolutePath, URI defaultFSURI, Configuration conf) {
+        String defaultHost = defaultFSURI.getHost() ==  null ? "" : defaultFSURI.getHost().toLowerCase();
+        String defaultScheme = defaultFSURI.getScheme() == null ? "" : defaultFSURI.getScheme().toLowerCase();
+
+        Set<String> result = new HashSet<String>();
+        String[] fnames = absolutePath.split(",");
+        for (String fname : fnames) {
+            // remove leading/trailing whitespace(s)
+            Path path = new Path(fname.trim());
+            URI uri = path.toUri();
+            if (uri.isAbsolute()) { // If it has scheme
+                String thisHost = uri.getHost() == null ? "" : uri.getHost().toLowerCase();
+                String scheme = uri.getScheme().toLowerCase();
+                // If host and scheme are same continue
+                if (scheme.equals(defaultScheme) && (thisHost.equals(defaultHost) || thisHost.isEmpty())) {
+                    continue;
+                }
+                String authority = uri.getAuthority() == null ? "" : uri.getAuthority()
+                        .toLowerCase();
+                if (scheme.equals("har")) {
+                    String[] parts = authority.split("-", 2);
+                    scheme = parts[0];
+                    if (parts.length < 2) {
+                        authority = "";
+                    } else {
+                        authority = parts[1];
+                    }
+                    if (scheme.isEmpty() || (scheme.equals(defaultScheme) &&
+                            authority.equals(defaultFSURI.getAuthority()))) {
+                        continue;
+                    }
+                } else if (!HadoopShims.hasFileSystemImpl(path, conf)) {
+                    continue;
+                }
+                result.add(scheme + "://" + authority);
+            }
+        }
+        return result;
      }
 
      static String constructFileNameSignature(String fileName, FuncSpec funcSpec) {
@@ -218,11 +226,11 @@ public class QueryParserUtils {
 
         return null;
     }
-    
+
     static QueryParser createParser(CommonTokenStream tokens) {
         return createParser(tokens, 0);
     }
-    
+
     static QueryParser createParser(CommonTokenStream tokens, int lineOffset) {
         QueryParser parser = new QueryParser(tokens);
         PigParserNodeAdaptor adaptor = new PigParserNodeAdaptor(

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Wed Apr 30 14:00:20 2014
@@ -346,7 +346,7 @@ public abstract class JobStats extends O
 
         for (String className : readerNames.split(",")) {
             reader = (PigStatsOutputSizeReader) PigContext.instantiateFuncFromSpec(className);
-            if (reader.supports(sto)) {
+            if (reader.supports(sto, conf)) {
                 LOG.info("using output size reader: " + className);
                 try {
                     return reader.getOutputSize(sto, conf);

Added: pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java?rev=1591297&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java (added)
+++ pig/trunk/test/org/apache/pig/parser/TestQueryParserUtils.java Wed Apr 30 14:00:20 2014
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.parser;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.Util;
+import org.junit.Test;
+
+public class TestQueryParserUtils {
+
+    @Test
+    public void testSetHDFSServers() throws Exception {
+        Properties props = new Properties();
+        props.setProperty("fs.default.name", "hdfs://nn1:8020/tmp");
+        PigContext pc = new PigContext(ExecType.LOCAL, props);
+
+        //No scheme/host
+        QueryParserUtils.setHdfsServers("hdfs:///tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+        QueryParserUtils.setHdfsServers("/tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+        QueryParserUtils.setHdfsServers("tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+
+        // Same as default host and scheme
+        QueryParserUtils.setHdfsServers("hdfs://nn1/tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+        QueryParserUtils.setHdfsServers("hdfs://nn1:8020/tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+
+        // Same host different scheme
+        QueryParserUtils.setHdfsServers("hftp://nn1/tmp", pc);
+        assertEquals("hftp://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+        QueryParserUtils.setHdfsServers("hftp://nn1:50070/tmp", pc);
+        assertEquals("hftp://nn1,hftp://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+        // There should be no duplicates
+        QueryParserUtils.setHdfsServers("hftp://nn1:50070/tmp", pc);
+        assertEquals("hftp://nn1,hftp://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+
+        // har
+        props.remove("mapreduce.job.hdfs-servers");
+        QueryParserUtils.setHdfsServers("har:///tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+        QueryParserUtils.setHdfsServers("har://hdfs-nn1:8020/tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+        QueryParserUtils.setHdfsServers("har://hdfs-nn1/tmp", pc);
+        assertEquals("hdfs://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+        // Non existing filesystem scheme
+        props.remove("mapreduce.job.hdfs-servers");
+        QueryParserUtils.setHdfsServers("hello://nn1/tmp", pc);
+        assertEquals(null, props.getProperty("mapreduce.job.hdfs-servers"));
+
+        if(Util.isHadoop23() || Util.isHadoop2_0()) {
+            // webhdfs
+            props.remove("mapreduce.job.hdfs-servers");
+            QueryParserUtils.setHdfsServers("webhdfs://nn1/tmp", pc);
+            assertEquals("webhdfs://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+            QueryParserUtils.setHdfsServers("webhdfs://nn1:50070/tmp", pc);
+            assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+
+            // har with webhfs
+            QueryParserUtils.setHdfsServers("har://webhdfs-nn1:50070/tmp", pc);
+            assertEquals("webhdfs://nn1,webhdfs://nn1:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+            QueryParserUtils.setHdfsServers("har://webhdfs-nn2:50070/tmp", pc);
+            assertEquals("webhdfs://nn1,webhdfs://nn1:50070,webhdfs://nn2:50070", props.getProperty("mapreduce.job.hdfs-servers"));
+            props.remove("mapreduce.job.hdfs-servers");
+            QueryParserUtils.setHdfsServers("har://webhdfs-nn1/tmp", pc);
+            assertEquals("webhdfs://nn1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+            //viewfs
+            props.remove("mapreduce.job.hdfs-servers");
+            QueryParserUtils.setHdfsServers("viewfs:/tmp", pc);
+            assertEquals("viewfs://", props.getProperty("mapreduce.job.hdfs-servers"));
+            QueryParserUtils.setHdfsServers("viewfs:///tmp", pc);
+            assertEquals("viewfs://", props.getProperty("mapreduce.job.hdfs-servers"));
+            QueryParserUtils.setHdfsServers("viewfs://cluster1/tmp", pc);
+            assertEquals("viewfs://,viewfs://cluster1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+            //har with viewfs
+            props.remove("mapreduce.job.hdfs-servers");
+            QueryParserUtils.setHdfsServers("har://viewfs/tmp", pc);
+            assertEquals("viewfs://", props.getProperty("mapreduce.job.hdfs-servers"));
+            QueryParserUtils.setHdfsServers("har://viewfs-cluster1/tmp", pc);
+            assertEquals("viewfs://,viewfs://cluster1", props.getProperty("mapreduce.job.hdfs-servers"));
+
+
+        }
+
+
+    }
+
+}

Modified: pig/trunk/test/org/apache/pig/test/TestMRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMRJobStats.java?rev=1591297&r1=1591296&r2=1591297&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMRJobStats.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMRJobStats.java Wed Apr 30 14:00:20 2014
@@ -25,7 +25,6 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Properties;
 
@@ -190,7 +189,7 @@ public class TestMRJobStats {
          * @param sto POStore
          */
         @Override
-        public boolean supports(POStore sto) {
+        public boolean supports(POStore sto, Configuration conf) {
             return true;
         }