You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/01 07:34:56 UTC

svn commit: r1509081 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/ java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/ java/org/apache/hadoop/hive/ql/io/rcfile/merge/ test/org/apache/hadoop/hive/ql/io/

Author: gunther
Date: Thu Aug  1 05:34:55 2013
New Revision: 1509081

URL: http://svn.apache.org/r1509081
Log:
HIVE-4843: Refactoring MapRedTask and ExecDriver for better re-usability and readability (Vikram Dixit K via Gunther Hagleitner)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Context.java Thu Aug  1 05:34:55 2013
@@ -563,6 +563,10 @@ public class Context {
     pathToCS.put(path, cs);
   }
 
+  public ContentSummary getCS(Path path) {
+    return getCS(path.toString());
+  }
+
   public ContentSummary getCS(String path) {
     return pathToCS.get(path);
   }
@@ -575,7 +579,6 @@ public class Context {
     return conf;
   }
 
-
   /**
    * Given a mapping from paths to objects, localize any MR tmp paths
    * @param map mapping from paths to objects

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Aug  1 05:34:55 2013
@@ -63,6 +63,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -108,6 +109,7 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -124,6 +126,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
@@ -131,12 +134,10 @@ import org.apache.hadoop.hive.ql.plan.Re
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.Adjacency;
 import org.apache.hadoop.hive.ql.plan.api.Graph;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotEqual;
@@ -150,9 +151,11 @@ import org.apache.hadoop.hive.shims.Shim
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
@@ -429,6 +432,7 @@ public final class Utilities {
       return new Expression(dateVal, dateVal.getClass(), "new", args);
     }
 
+    @Override
     protected boolean mutatesTo(Object oldInstance, Object newInstance) {
       if (oldInstance == null || newInstance == null) {
         return false;
@@ -442,6 +446,7 @@ public final class Utilities {
    * it is not serialization friendly.
    */
   public static class TimestampPersistenceDelegate extends DatePersistenceDelegate {
+    @Override
     protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) {
       Timestamp ts = (Timestamp)oldInstance;
       Object[] args = { ts.getNanos() };
@@ -451,21 +456,21 @@ public final class Utilities {
   }
 
   public static void setMapRedWork(Configuration conf, MapredWork w, String hiveScratchDir) {
-    setMapWork(conf, w.getMapWork(), hiveScratchDir);
+    setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
     if (w.getReduceWork() != null) {
-      setReduceWork(conf, w.getReduceWork(), hiveScratchDir);
+      setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
     }
   }
 
-  public static void setMapWork(Configuration conf, MapWork w, String hiveScratchDir) {
-    setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME);
+  public static Path setMapWork(Configuration conf, MapWork w, String hiveScratchDir, boolean useCache) {
+    return setBaseWork(conf, w, hiveScratchDir, MAP_PLAN_NAME, useCache);
   }
 
-  public static void setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir) {
-    setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME);
+  public static Path setReduceWork(Configuration conf, ReduceWork w, String hiveScratchDir, boolean useCache) {
+    return setBaseWork(conf, w, hiveScratchDir, REDUCE_PLAN_NAME, useCache);
   }
 
-  private static void setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name) {
+  private static Path setBaseWork(Configuration conf, BaseWork w, String hiveScratchDir, String name, boolean useCache) {
     try {
       setPlanPath(conf, hiveScratchDir);
 
@@ -479,7 +484,7 @@ public final class Utilities {
       // Serialize the plan to the default hdfs instance
       // Except for hadoop local mode execution where we should be
       // able to get the plan directly from the cache
-      if (!ShimLoader.getHadoopShims().isLocalMode(conf)) {
+      if (useCache && !ShimLoader.getHadoopShims().isLocalMode(conf)) {
         // Set up distributed cache
         if (!DistributedCache.getSymlink(conf)) {
           DistributedCache.createSymlink(conf);
@@ -495,6 +500,8 @@ public final class Utilities {
 
       // Cache the plan in this process
       gWorkMap.put(planPath, w);
+
+      return planPath;
     } catch (Exception e) {
       e.printStackTrace();
       throw new RuntimeException(e);
@@ -609,6 +616,7 @@ public final class Utilities {
    * De-serialize an object. This helper function mainly makes sure that enums,
    * counters, etc are handled properly.
    */
+  @SuppressWarnings("unchecked")
   public static <T> T deserializeObject(InputStream in) {
     XMLDecoder d = null;
     try {
@@ -1778,7 +1786,7 @@ public final class Utilities {
     }
   }
 
-  public static Object getInputSummaryLock = new Object();
+  public static Object INPUT_SUMMARY_LOCK = new Object();
 
   /**
    * Calculate the total size of input files.
@@ -1801,7 +1809,7 @@ public final class Utilities {
 
     // Since multiple threads could call this method concurrently, locking
     // this method will avoid number of threads out of control.
-    synchronized (getInputSummaryLock) {
+    synchronized (INPUT_SUMMARY_LOCK) {
       // For each input path, calculate the total size.
       for (String path : work.getPathToAliases().keySet()) {
         Path p = new Path(path);
@@ -1912,7 +1920,7 @@ public final class Utilities {
                 throw new IOException(e);
               }
             } while (!executorDone);
-    }
+          }
           executor.shutdown();
         }
         HiveInterruptUtils.checkInterrupted();
@@ -1936,7 +1944,7 @@ public final class Utilities {
     }
   }
 
-  public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx)
+  public static boolean isEmptyPath(JobConf job, Path dirPath, Context ctx)
       throws Exception {
     ContentSummary cs = ctx.getCS(dirPath);
     if (cs != null) {
@@ -1946,8 +1954,7 @@ public final class Utilities {
     } else {
       LOG.info("Content Summary not cached for " + dirPath);
     }
-    Path p = new Path(dirPath);
-    return isEmptyPath(job, p);
+    return isEmptyPath(job, dirPath);
   }
 
   public static boolean isEmptyPath(JobConf job, Path dirPath) throws Exception {
@@ -2532,5 +2539,415 @@ public final class Utilities {
 
     return sb.toString();
   }
+
+  /**
+   * Estimate the number of reducers needed for this job, based on job input,
+   * and configuration parameters.
+   *
+   * The output of this method should only be used if the output of this
+   * MapRedTask is not being used to populate a bucketed table and the user
+   * has not specified the number of reducers to use.
+   *
+   * @return the number of reducers.
+   */
+  public static int estimateNumberOfReducers(HiveConf conf, ContentSummary inputSummary,
+                                             MapWork work, boolean finalMapRed) throws IOException {
+    long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
+    int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
+
+    double samplePercentage = getHighestSamplePercentage(work);
+    long totalInputFileSize = getTotalInputFileSize(inputSummary, work, samplePercentage);
+
+    // if all inputs are sampled, we should shrink the size of reducers accordingly.
+    if (totalInputFileSize != inputSummary.getLength()) {
+      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
+    } else {
+      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
+        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
+    }
+
+    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
+    reducers = Math.max(1, reducers);
+    reducers = Math.min(maxReducers, reducers);
+
+    // If this map reduce job writes final data to a table and bucketing is being inferred,
+    // and the user has configured Hive to do this, make sure the number of reducers is a
+    // power of two
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
+        finalMapRed && !work.getBucketedColsByDirectory().isEmpty()) {
+
+      int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
+      int reducersPowerTwo = (int)Math.pow(2, reducersLog);
+
+      // If the original number of reducers was a power of two, use that
+      if (reducersPowerTwo / 2 == reducers) {
+        return reducers;
+      } else if (reducersPowerTwo > maxReducers) {
+        // If the next power of two greater than the original number of reducers is greater
+        // than the max number of reducers, use the preceding power of two, which is strictly
+        // less than the original number of reducers and hence the max
+        reducers = reducersPowerTwo / 2;
+      } else {
+        // Otherwise use the smallest power of two greater than the original number of reducers
+        reducers = reducersPowerTwo;
+      }
+    }
+
+    return reducers;
+  }
+
+  /**
+   * Computes the total input file size. If block sampling was used it will scale this
+   * value by the highest sample percentage (as an estimate for input).
+   *
+   * @param inputSummary
+   * @param work
+   * @param highestSamplePercentage
+   * @return estimated total input size for job
+   */
+  public static long getTotalInputFileSize (ContentSummary inputSummary, MapWork work,
+      double highestSamplePercentage) {
+    long totalInputFileSize = inputSummary.getLength();
+    if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+      // If percentage block sampling wasn't used, we don't need to do any estimation
+      return totalInputFileSize;
+    }
+
+    if (highestSamplePercentage >= 0) {
+      totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
+          , totalInputFileSize);
+    }
+    return totalInputFileSize;
+  }
+
+  /**
+   * Computes the total number of input files. If block sampling was used it will scale this
+   * value by the highest sample percentage (as an estimate for # input files).
+   *
+   * @param inputSummary
+   * @param work
+   * @param highestSamplePercentage
+   * @return
+   */
+  public static long getTotalInputNumFiles (ContentSummary inputSummary, MapWork work,
+      double highestSamplePercentage) {
+    long totalInputNumFiles = inputSummary.getFileCount();
+    if (work.getNameToSplitSample() == null || work.getNameToSplitSample().isEmpty()) {
+      // If percentage block sampling wasn't used, we don't need to do any estimation
+      return totalInputNumFiles;
+    }
+
+    if (highestSamplePercentage >= 0) {
+      totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
+          , totalInputNumFiles);
+    }
+    return totalInputNumFiles;
+  }
+
+  /**
+   * Returns the highest sample percentage of any alias in the given MapWork
+   */
+  public static double getHighestSamplePercentage (MapWork work) {
+    double highestSamplePercentage = 0;
+    for (String alias : work.getAliasToWork().keySet()) {
+      if (work.getNameToSplitSample().containsKey(alias)) {
+        Double rate = work.getNameToSplitSample().get(alias).getPercent();
+        if (rate != null && rate > highestSamplePercentage) {
+          highestSamplePercentage = rate;
+        }
+      } else {
+        highestSamplePercentage = -1;
+        break;
+      }
+    }
+
+    return highestSamplePercentage;
+  }
+
+  /**
+   * Computes a list of all input paths needed to compute the given MapWork. All aliases
+   * are considered and a merged list of input paths is returned. If any input path points
+   * to an empty table or partition a dummy file in the scratch dir is instead created and
+   * added to the list. This is needed to avoid special casing the operator pipeline for
+   * these cases.
+   *
+   * @param job JobConf used to run the job
+   * @param work MapWork encapsulating the info about the task
+   * @param hiveScratchDir The tmp dir used to create dummy files if needed
+   * @param ctx Context object
+   * @return List of paths to process for the given MapWork
+   * @throws Exception
+   */
+  public static List<Path> getInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
+      throws Exception {
+    int sequenceNumber = 0;
+
+    Set<Path> pathsProcessed = new HashSet<Path>();
+    List<Path> pathsToAdd = new LinkedList<Path>();
+    // AliasToWork contains all the aliases
+    for (String alias : work.getAliasToWork().keySet()) {
+      LOG.info("Processing alias " + alias);
+
+      // The alias may not have any path
+      Path path = null;
+      for (String file : new LinkedList<String>(work.getPathToAliases().keySet())) {
+        List<String> aliases = work.getPathToAliases().get(file);
+        if (aliases.contains(alias)) {
+          path = new Path(file);
+
+          // Multiple aliases can point to the same path - it should be
+          // processed only once
+          if (pathsProcessed.contains(path)) {
+            continue;
+          }
+
+          pathsProcessed.add(path);
+
+          LOG.info("Adding input file " + path);
+          if (isEmptyPath(job, path, ctx)) {
+            path = createDummyFileForEmptyPartition(path, job, work,
+                 hiveScratchDir, alias, sequenceNumber++);
+
+          }
+          pathsToAdd.add(path);
+        }
+      }
+
+      // If the query references non-existent partitions
+      // We need to add a empty file, it is not acceptable to change the
+      // operator tree
+      // Consider the query:
+      // select * from (select count(1) from T union all select count(1) from
+      // T2) x;
+      // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
+      // rows)
+      if (path == null) {
+        path = createDummyFileForEmptyTable(job, work, hiveScratchDir,
+            alias, sequenceNumber++);
+        pathsToAdd.add(path);
+      }
+    }
+    return pathsToAdd;
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  private static Path createEmptyFile(String hiveScratchDir,
+      Class<? extends HiveOutputFormat> outFileFormat, JobConf job,
+      int sequenceNumber, Properties props, boolean dummyRow)
+          throws IOException, InstantiationException, IllegalAccessException {
+
+    // create a dummy empty file in a new directory
+    String newDir = hiveScratchDir + File.separator + sequenceNumber;
+    Path newPath = new Path(newDir);
+    FileSystem fs = newPath.getFileSystem(job);
+    fs.mkdirs(newPath);
+    //Qualify the path against the file system. The user configured path might contain default port which is skipped
+    //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
+    //file path.
+    newPath = fs.makeQualified(newPath);
+    String newFile = newDir + File.separator + "emptyFile";
+    Path newFilePath = new Path(newFile);
+
+    String onefile = newPath.toString();
+    RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+        Text.class, false, props, null);
+    if (dummyRow) {
+      // empty files are omitted at CombineHiveInputFormat.
+      // for meta-data only query, it effectively makes partition columns disappear..
+      // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
+      recWriter.write(new Text("empty"));  // written via HiveIgnoreKeyTextOutputFormat
+    }
+    recWriter.close(false);
+
+    return newPath;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work,
+      String hiveScratchDir, String alias, int sequenceNumber)
+          throws IOException, InstantiationException, IllegalAccessException {
+
+    String strPath = path.toString();
+
+    // The input file does not exist, replace it by a empty file
+    PartitionDesc partDesc = work.getPathToPartitionInfo().get(strPath);
+    boolean nonNative = partDesc.getTableDesc().isNonNative();
+    boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
+    Properties props = partDesc.getProperties();
+    Class<? extends HiveOutputFormat> outFileFormat = partDesc.getOutputFileFormatClass();
+
+    if (nonNative) {
+      // if this isn't a hive table we can't create an empty file for it.
+      return path;
+    }
+
+    Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+        sequenceNumber, props, oneRow);
+
+
+    LOG.info("Changed input file to " + newPath);
+
+    // update the work
+    String strNewPath = newPath.toString();
+
+    LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+    pathToAliases.put(strNewPath, pathToAliases.get(strPath));
+    pathToAliases.remove(strPath);
+
+    work.setPathToAliases(pathToAliases);
+
+    LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+    pathToPartitionInfo.put(strNewPath, pathToPartitionInfo.get(strPath));
+    pathToPartitionInfo.remove(strPath);
+    work.setPathToPartitionInfo(pathToPartitionInfo);
+
+    return newPath;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static Path createDummyFileForEmptyTable(JobConf job, MapWork work,
+      String hiveScratchDir, String alias, int sequenceNumber)
+          throws IOException, InstantiationException, IllegalAccessException {
+
+    TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
+    Properties props = tableDesc.getProperties();
+    boolean nonNative = tableDesc.isNonNative();
+    Class<? extends HiveOutputFormat> outFileFormat = tableDesc.getOutputFileFormatClass();
+
+    if (nonNative) {
+      // if this isn't a hive table we can't create an empty file for it.
+      return null;
+    }
+
+    Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job,
+        sequenceNumber, props, false);
+
+
+    LOG.info("Changed input file to " + newPath.toString());
+
+    // update the work
+
+    LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
+    ArrayList<String> newList = new ArrayList<String>();
+    newList.add(alias);
+    pathToAliases.put(newPath.toUri().toString(), newList);
+
+    work.setPathToAliases(pathToAliases);
+
+    LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
+    PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
+    pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
+    work.setPathToPartitionInfo(pathToPartitionInfo);
+
+    return newPath;
+  }
+
+  /**
+   * setInputPaths add all the paths in the provided list to the Job conf object
+   * as input paths for the job.
+   *
+   * @param job
+   * @param pathsToAdd
+   */
+  public static void setInputPaths(JobConf job, List<Path> pathsToAdd) {
+
+    Path[] addedPaths = FileInputFormat.getInputPaths(job);
+    if (addedPaths == null) {
+      addedPaths = new Path[0];
+    }
+
+    Path[] combined = new Path[addedPaths.length + pathsToAdd.size()];
+    System.arraycopy(addedPaths, 0, combined, 0, addedPaths.length);
+
+    int i = 0;
+    for(Path p: pathsToAdd) {
+      combined[addedPaths.length + (i++)] = p;
+    }
+    FileInputFormat.setInputPaths(job, combined);
+  }
+
+  /**
+   * Set hive input format, and input format file if necessary.
+   */
+  public static void setInputAttributes(Configuration conf, MapWork mWork) {
+    if (mWork.getInputformat() != null) {
+      HiveConf.setVar(conf, HiveConf.ConfVars.HIVEINPUTFORMAT, mWork.getInputformat());
+    }
+    if (mWork.getIndexIntermediateFile() != null) {
+      conf.set("hive.index.compact.file", mWork.getIndexIntermediateFile());
+      conf.set("hive.index.blockfilter.file", mWork.getIndexIntermediateFile());
+    }
+
+    // Intentionally overwrites anything the user may have put here
+    conf.setBoolean("hive.input.format.sorted", mWork.isInputFormatSorted());
+  }
+
+  /**
+   * Hive uses tmp directories to capture the output of each FileSinkOperator.
+   * This method creates all necessary tmp directories for FileSinks in the Mapwork.
+   *
+   * @param conf Used to get the right FileSystem
+   * @param mWork Used to find FileSinkOperators
+   * @throws IOException
+   */
+  public static void createTmpDirs(Configuration conf, MapWork mWork)
+      throws IOException {
+
+    Map<String, ArrayList<String>> pa = mWork.getPathToAliases();
+    if (pa != null) {
+      List<Operator<? extends OperatorDesc>> ops =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+      for (List<String> ls : pa.values()) {
+        for (String a : ls) {
+          ops.add(mWork.getAliasToWork().get(a));
+        }
+      }
+      createTmpDirs(conf, ops);
+    }
+  }
+
+  /**
+   * Hive uses tmp directories to capture the output of each FileSinkOperator.
+   * This method creates all necessary tmp directories for FileSinks in the ReduceWork.
+   *
+   * @param conf Used to get the right FileSystem
+   * @param rWork Used to find FileSinkOperators
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public static void createTmpDirs(Configuration conf, ReduceWork rWork)
+      throws IOException {
+    if (rWork == null) {
+      return;
+    }
+    List<Operator<? extends OperatorDesc>> ops
+      = new LinkedList<Operator<? extends OperatorDesc>>();
+    ops.add(rWork.getReducer());
+    createTmpDirs(conf, ops);
+  }
+
+  private static void createTmpDirs(Configuration conf,
+      List<Operator<? extends OperatorDesc>> ops) throws IOException {
+
+    while (!ops.isEmpty()) {
+      Operator<? extends OperatorDesc> op = ops.remove(0);
+
+      if (op instanceof FileSinkOperator) {
+        FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+        String tempDir = fdesc.getDirName();
+
+        if (tempDir != null) {
+          Path tempPath = Utilities.toTempPath(new Path(tempDir));
+          FileSystem fs = tempPath.getFileSystem(conf);
+          fs.mkdirs(tempPath);
+        }
+      }
+
+      if (op.getChildOperators() != null) {
+        ops.addAll(op.getChildOperators());
+      }
+    }
+  }
 }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Aug  1 05:34:55 2013
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec.mr;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -29,13 +28,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Enumeration;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -55,8 +49,6 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
 import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -66,20 +58,16 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
-import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
@@ -90,7 +78,6 @@ import org.apache.hadoop.hive.shims.Shim
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -211,49 +198,10 @@ public class ExecDriver extends Task<Map
     return false;
   }
 
-  protected void createTmpDirs() throws IOException {
-    // fix up outputs
-    Map<String, ArrayList<String>> pa = work.getMapWork().getPathToAliases();
-    if (pa != null) {
-      List<Operator<? extends OperatorDesc>> opList =
-        new ArrayList<Operator<? extends OperatorDesc>>();
-
-      if (work.getReduceWork() != null) {
-        opList.add(work.getReduceWork().getReducer());
-      }
-
-      for (List<String> ls : pa.values()) {
-        for (String a : ls) {
-          opList.add(work.getMapWork().getAliasToWork().get(a));
-
-          while (!opList.isEmpty()) {
-            Operator<? extends OperatorDesc> op = opList.remove(0);
-
-            if (op instanceof FileSinkOperator) {
-              FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
-              String tempDir = fdesc.getDirName();
-
-              if (tempDir != null) {
-                Path tempPath = Utilities.toTempPath(new Path(tempDir));
-                LOG.info("Making Temp Directory: " + tempDir);
-                FileSystem fs = tempPath.getFileSystem(job);
-                fs.mkdirs(tempPath);
-              }
-            }
-
-            if (op.getChildOperators() != null) {
-              opList.addAll(op.getChildOperators());
-            }
-          }
-        }
-      }
-    }
-  }
-
    /**
    * Execute a query plan using Hadoop.
    */
-  @SuppressWarnings("deprecation")
+  @SuppressWarnings({"deprecation", "unchecked"})
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -430,7 +378,8 @@ public class ExecDriver extends Task<Map
         }
       }
       work.configureJobConf(job);
-      addInputPaths(job, mWork, emptyScratchDirStr, ctx);
+      List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDirStr, ctx);
+      Utilities.setInputPaths(job, inputPaths);
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
 
@@ -470,7 +419,8 @@ public class ExecDriver extends Task<Map
         }
       }
 
-      this.createTmpDirs();
+      Utilities.createTmpDirs(job, mWork);
+      Utilities.createTmpDirs(job, rWork);
 
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
@@ -658,6 +608,7 @@ public class ExecDriver extends Task<Map
     }
   }
 
+  @SuppressWarnings("unchecked")
   public static void main(String[] args) throws IOException, HiveException {
 
     String planFileName = null;
@@ -834,164 +785,6 @@ public class ExecDriver extends Task<Map
     return w.getReduceWork() != null;
   }
 
-  /**
-   * Handle a empty/null path for a given alias.
-   */
-  private static int addInputPath(String path, JobConf job, MapWork work, String hiveScratchDir,
-      int numEmptyPaths, boolean isEmptyPath, String alias) throws Exception {
-    // either the directory does not exist or it is empty
-    assert path == null || isEmptyPath;
-
-    // The input file does not exist, replace it by a empty file
-    Class<? extends HiveOutputFormat> outFileFormat = null;
-    boolean nonNative = true;
-    boolean oneRow = false;
-    Properties props;
-    if (isEmptyPath) {
-      PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
-      props = partDesc.getProperties();
-      outFileFormat = partDesc.getOutputFileFormatClass();
-      nonNative = partDesc.getTableDesc().isNonNative();
-      oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class;
-    } else {
-      TableDesc tableDesc = work.getAliasToPartnInfo().get(alias).getTableDesc();
-      props = tableDesc.getProperties();
-      outFileFormat = tableDesc.getOutputFileFormatClass();
-      nonNative = tableDesc.isNonNative();
-    }
-
-    if (nonNative) {
-      FileInputFormat.addInputPaths(job, path);
-      LOG.info("Add a non-native table " + path);
-      return numEmptyPaths;
-    }
-
-    // create a dummy empty file in a new directory
-    String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
-    Path newPath = new Path(newDir);
-    FileSystem fs = newPath.getFileSystem(job);
-    fs.mkdirs(newPath);
-    //Qualify the path against the filesystem. The user configured path might contain default port which is skipped
-    //in the file status. This makes sure that all paths which goes into PathToPartitionInfo are always listed status
-    //filepath.
-    newPath = fs.makeQualified(newPath);
-    String newFile = newDir + File.separator + "emptyFile";
-    Path newFilePath = new Path(newFile);
-
-    LOG.info("Changed input file to " + newPath.toString());
-
-    // toggle the work
-
-    LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
-
-    if (isEmptyPath) {
-      assert path != null;
-      pathToAliases.put(newPath.toUri().toString(), pathToAliases.get(path));
-      pathToAliases.remove(path);
-    } else {
-      assert path == null;
-      ArrayList<String> newList = new ArrayList<String>();
-      newList.add(alias);
-      pathToAliases.put(newPath.toUri().toString(), newList);
-    }
-
-    work.setPathToAliases(pathToAliases);
-
-    LinkedHashMap<String, PartitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
-    if (isEmptyPath) {
-      pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
-      pathToPartitionInfo.remove(path);
-    } else {
-      PartitionDesc pDesc = work.getAliasToPartnInfo().get(alias).clone();
-      pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
-    }
-    work.setPathToPartitionInfo(pathToPartitionInfo);
-
-    String onefile = newPath.toString();
-    RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
-        Text.class, false, props, null);
-    if (oneRow) {
-      // empty files are ommited at CombineHiveInputFormat.
-      // for metadata only query, it effectively makes partition columns disappear..
-      // this could be fixed by other methods, but this seemed to be the most easy (HIVEV-2955)
-      recWriter.write(new Text("empty"));  // written via HiveIgnoreKeyTextOutputFormat
-    }
-    recWriter.close(false);
-    FileInputFormat.addInputPaths(job, onefile);
-    return numEmptyPaths;
-  }
-
-  public static void addInputPaths(JobConf job, MapWork work, String hiveScratchDir, Context ctx)
-      throws Exception {
-    int numEmptyPaths = 0;
-
-    Set<String> pathsProcessed = new HashSet<String>();
-    List<String> pathsToAdd = new LinkedList<String>();
-    // AliasToWork contains all the aliases
-    for (String oneAlias : work.getAliasToWork().keySet()) {
-      LOG.info("Processing alias " + oneAlias);
-      List<String> emptyPaths = new ArrayList<String>();
-
-      // The alias may not have any path
-      String path = null;
-      for (String onefile : work.getPathToAliases().keySet()) {
-        List<String> aliases = work.getPathToAliases().get(onefile);
-        if (aliases.contains(oneAlias)) {
-          path = onefile;
-
-          // Multiple aliases can point to the same path - it should be
-          // processed only once
-          if (pathsProcessed.contains(path)) {
-            continue;
-          }
-
-          pathsProcessed.add(path);
-
-          LOG.info("Adding input file " + path);
-          if (Utilities.isEmptyPath(job, path, ctx)) {
-            emptyPaths.add(path);
-          } else {
-            pathsToAdd.add(path);
-          }
-        }
-      }
-
-      // Create a empty file if the directory is empty
-      for (String emptyPath : emptyPaths) {
-        numEmptyPaths = addInputPath(emptyPath, job, work, hiveScratchDir, numEmptyPaths, true,
-            oneAlias);
-      }
-
-      // If the query references non-existent partitions
-      // We need to add a empty file, it is not acceptable to change the
-      // operator tree
-      // Consider the query:
-      // select * from (select count(1) from T union all select count(1) from
-      // T2) x;
-      // If T is empty and T2 contains 100 rows, the user expects: 0, 100 (2
-      // rows)
-      if (path == null) {
-        numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false,
-            oneAlias);
-      }
-    }
-    setInputPaths(job, pathsToAdd);
-  }
-
-  private static void setInputPaths(JobConf job, List<String> pathsToAdd) {
-    Path[] addedPaths = FileInputFormat.getInputPaths(job);
-    List<Path> toAddPathList = new ArrayList<Path>();
-    if(addedPaths != null) {
-      for(Path added: addedPaths) {
-        toAddPathList.add(added);
-      }
-    }
-    for(String toAdd: pathsToAdd) {
-      toAddPathList.add(new Path(toAdd));
-    }
-    FileInputFormat.setInputPaths(job, toAddPathList.toArray(new Path[0]));
-  }
-
   @Override
   public StageType getType() {
     return StageType.MAPRED;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Thu Aug  1 05:34:55 2013
@@ -108,7 +108,9 @@ public class MapRedTask extends ExecDriv
 
         // set the values of totalInputFileSize and totalInputNumFiles, estimating them
         // if percentage block sampling is being used
-        estimateInputSize();
+        double samplePercentage = Utilities.getHighestSamplePercentage(work.getMapWork());
+        totalInputFileSize = Utilities.getTotalInputFileSize(inputSummary, work.getMapWork(), samplePercentage);
+        totalInputNumFiles = Utilities.getTotalInputNumFiles(inputSummary, work.getMapWork(), samplePercentage);
 
         // at this point the number of reducers is precisely defined in the plan
         int numReducers = work.getReduceWork() == null ? 0 : work.getReduceWork().getNumReduceTasks();
@@ -403,7 +405,11 @@ public class MapRedTask extends ExecDriv
             .printInfo("Number of reduce tasks not specified. Defaulting to jobconf value of: "
             + reducers);
       } else {
-        int reducers = estimateNumberOfReducers();
+        if (inputSummary == null) {
+          inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
+        }
+        int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), 
+                                                          work.isFinalMapRed());
         rWork.setNumReduceTasks(reducers);
         console
             .printInfo("Number of reduce tasks not specified. Estimated from input data size: "
@@ -424,123 +430,6 @@ public class MapRedTask extends ExecDriv
   }
 
   /**
-   * Estimate the number of reducers needed for this job, based on job input,
-   * and configuration parameters.
-   *
-   * The output of this method should only be used if the output of this
-   * MapRedTask is not being used to populate a bucketed table and the user
-   * has not specified the number of reducers to use.
-   *
-   * @return the number of reducers.
-   */
-  private int estimateNumberOfReducers() throws IOException {
-    long bytesPerReducer = conf.getLongVar(HiveConf.ConfVars.BYTESPERREDUCER);
-    int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
-
-    if(inputSummary == null) {
-      // compute the summary and stash it away
-      inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
-    }
-
-    // if all inputs are sampled, we should shrink the size of reducers accordingly.
-    estimateInputSize();
-
-    if (totalInputFileSize != inputSummary.getLength()) {
-      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-          + maxReducers + " estimated totalInputFileSize=" + totalInputFileSize);
-    } else {
-      LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers="
-        + maxReducers + " totalInputFileSize=" + totalInputFileSize);
-    }
-
-    int reducers = (int) ((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
-    reducers = Math.max(1, reducers);
-    reducers = Math.min(maxReducers, reducers);
-
-    // If this map reduce job writes final data to a table and bucketing is being inferred,
-    // and the user has configured Hive to do this, make sure the number of reducers is a
-    // power of two
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT_NUM_BUCKETS_POWER_TWO) &&
-        work.isFinalMapRed() && !work.getMapWork().getBucketedColsByDirectory().isEmpty()) {
-
-      int reducersLog = (int)(Math.log(reducers) / Math.log(2)) + 1;
-      int reducersPowerTwo = (int)Math.pow(2, reducersLog);
-
-      // If the original number of reducers was a power of two, use that
-      if (reducersPowerTwo / 2 == reducers) {
-        return reducers;
-      } else if (reducersPowerTwo > maxReducers) {
-        // If the next power of two greater than the original number of reducers is greater
-        // than the max number of reducers, use the preceding power of two, which is strictly
-        // less than the original number of reducers and hence the max
-        reducers = reducersPowerTwo / 2;
-      } else {
-        // Otherwise use the smallest power of two greater than the original number of reducers
-        reducers = reducersPowerTwo;
-      }
-    }
-
-    return reducers;
-  }
-
-  /**
-   * Sets the values of totalInputFileSize and totalInputNumFiles.  If percentage
-   * block sampling is used, these values are estimates based on the highest
-   * percentage being used for sampling multiplied by the value obtained from the
-   * input summary.  Otherwise, these values are set to the exact value obtained
-   * from the input summary.
-   *
-   * Once the function completes, inputSizeEstimated is set so that the logic is
-   * never run more than once.
-   */
-  private void estimateInputSize() {
-    if (inputSizeEstimated) {
-      // If we've already run this function, return
-      return;
-    }
-
-    MapWork mWork = work.getMapWork();
-
-    // Initialize the values to be those taken from the input summary
-    totalInputFileSize = inputSummary.getLength();
-    totalInputNumFiles = inputSummary.getFileCount();
-
-    if (mWork.getNameToSplitSample() == null || mWork.getNameToSplitSample().isEmpty()) {
-      // If percentage block sampling wasn't used, we don't need to do any estimation
-      inputSizeEstimated = true;
-      return;
-    }
-
-    // if all inputs are sampled, we should shrink the size of the input accordingly
-    double highestSamplePercentage = 0;
-    boolean allSample = false;
-    for (String alias : mWork.getAliasToWork().keySet()) {
-      if (mWork.getNameToSplitSample().containsKey(alias)) {
-        allSample = true;
-        Double rate = mWork.getNameToSplitSample().get(alias).getPercent();
-        if (rate != null && rate > highestSamplePercentage) {
-          highestSamplePercentage = rate;
-        }
-      } else {
-        allSample = false;
-        break;
-      }
-    }
-    if (allSample) {
-      // This is a little bit dangerous if inputs turns out not to be able to be sampled.
-      // In that case, we significantly underestimate the input.
-      // It's the same as estimateNumberOfReducers(). It's just our best
-      // guess and there is no guarantee.
-      totalInputFileSize = Math.min((long) (totalInputFileSize * highestSamplePercentage / 100D)
-          , totalInputFileSize);
-      totalInputNumFiles = Math.min((long) (totalInputNumFiles * highestSamplePercentage / 100D)
-          , totalInputNumFiles);
-    }
-
-    inputSizeEstimated = true;
-  }
-
-  /**
    * Find out if a job can be run in local mode based on it's characteristics
    *
    * @param conf Hive Configuration

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Thu Aug  1 05:34:55 2013
@@ -192,7 +192,7 @@ public class BlockMergeTask extends Task
     try {
       addInputPaths(job, work);
 
-      Utilities.setMapWork(job, work, ctx.getMRTmpFileURI());
+      Utilities.setMapWork(job, work, ctx.getMRTmpFileURI(), true);
 
       // remove the pwd from conf file so that job tracker doesn't show this
       // logs

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java?rev=1509081&r1=1509080&r2=1509081&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java Thu Aug  1 05:34:55 2013
@@ -167,7 +167,9 @@ public class TestSymlinkTextInputFormat 
       QueryPlan plan = drv.getPlan();
       MapRedTask selectTask = (MapRedTask)plan.getRootTasks().get(0);
 
-      ExecDriver.addInputPaths(newJob, selectTask.getWork().getMapWork(), emptyScratchDir.toString(), ctx);
+      List<Path> inputPaths = Utilities.getInputPaths(newJob, selectTask.getWork().getMapWork(), emptyScratchDir.toString(), ctx);
+      Utilities.setInputPaths(newJob, inputPaths);
+
       Utilities.setMapRedWork(newJob, selectTask.getWork(), ctx.getMRTmpFileURI());
       
       CombineHiveInputFormat combineInputFormat = ReflectionUtils.newInstance(