You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/10/15 08:51:36 UTC

svn commit: r825412 - in /hadoop/hive/trunk: CHANGES.txt ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java

Author: zshao
Date: Thu Oct 15 06:51:36 2009
New Revision: 825412

URL: http://svn.apache.org/viewvc?rev=825412&view=rev
Log:
HIVE-879. Fix null pointer for empty dir. (Namit Jain via zshao)

Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=825412&r1=825411&r2=825412&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Oct 15 06:51:36 2009
@@ -183,6 +183,8 @@
 
     HIVE-877. More test cases for UDFOPNegative. (Ning Zhang via zshao)
 
+    HIVE-879. Fix null pointer for empty dir. (Namit Jain via zshao)
+
 Release 0.4.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=825412&r1=825411&r2=825412&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Oct 15 06:51:36 2009
@@ -53,6 +53,7 @@
 import org.apache.log4j.BasicConfigurator;
 import org.apache.log4j.varia.NullAppender;
 import java.lang.ClassNotFoundException;
+import org.apache.hadoop.hive.common.FileUtils;
 
 public class ExecDriver extends Task<mapredWork> implements Serializable {
 
@@ -144,10 +145,10 @@
   /**
    * In Hive, when the user control-c's the command line, any running jobs
    * spawned from that command line are best-effort killed.
-   * 
+   *
    * This static constructor registers a shutdown thread to iterate over all the
    * running job kill URLs and do a get on them.
-   * 
+   *
    */
   static {
     if (new org.apache.hadoop.conf.Configuration().getBoolean(
@@ -195,7 +196,7 @@
           + " job  -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
     }
   }
-  
+
   /**
    * This class contains the state of the running task
    * Going forward, we will return this handle from execute
@@ -223,7 +224,7 @@
   }
 
   public void progress(TaskHandle taskHandle) throws IOException {
-    ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle; 
+    ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle;
     JobClient jc = th.getJobClient();
     RunningJob rj = th.getRunningJob();
     String lastReport = "";
@@ -238,7 +239,7 @@
       }
       th.setRunningJob(jc.getJob(rj.getJobID()));
       updateCounters(th);
-      
+
       String report = " map = " + this.mapProgress + "%,  reduce = " + this.reduceProgress + "%";
 
       if (!report.equals(lastReport)
@@ -286,13 +287,13 @@
     int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
     long totalInputFileSize = getTotalInputFileSize(job, work);
 
-    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers 
+    LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
         + " totalInputFileSize=" + totalInputFileSize);
 
     int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
     reducers = Math.max(1, reducers);
     reducers = Math.min(maxReducers, reducers);
-    return reducers;    
+    return reducers;
   }
 
   /**
@@ -301,13 +302,13 @@
   protected void setNumberOfReducers() throws IOException {
     // this is a temporary hack to fix things that are not fixed in the compiler
     Integer numReducersFromWork = work.getNumReduceTasks();
-    
+
     if(work.getReducer() == null) {
       console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
       work.setNumReduceTasks(Integer.valueOf(0));
     } else {
       if (numReducersFromWork >= 0) {
-        console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks()); 
+        console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
       } else if (job.getNumReduceTasks() > 0) {
         int reducers = job.getNumReduceTasks();
         work.setNumReduceTasks(reducers);
@@ -331,7 +332,7 @@
    * Calculate the total size of input files.
    * @param job the hadoop job conf.
    * @return the total size in bytes.
-   * @throws IOException 
+   * @throws IOException
    */
   public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {
     long r = 0;
@@ -385,7 +386,7 @@
     return reduceProgress == 100;
   }
 
-  
+
   /**
    * Execute a query plan using Hadoop
    */
@@ -437,7 +438,7 @@
     String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
     String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS);
     if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) {
-      String allJars = 
+      String allJars =
         StringUtils.isNotBlank(auxJars)
         ? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
         : addedJars;
@@ -455,7 +456,7 @@
     if (StringUtils.isNotBlank(addedArchives)) {
       initializeFiles("tmparchives", addedArchives);
     }
-    
+
     int returnVal = 0;
     RunningJob rj = null, orig_rj = null;
     boolean success = false;
@@ -469,7 +470,7 @@
       if (pwd != null)
         job.set(HiveConf.ConfVars.METASTOREPWD.varname, "HIVE");
       JobClient jc = new JobClient(job);
-      
+
 
       // make this client wait if job trcker is not behaving well.
       Throttle.checkJobTracker(job, LOG);
@@ -597,7 +598,7 @@
       BasicConfigurator.resetConfiguration();
       BasicConfigurator.configure(new NullAppender());
     }
-    
+
     if (planFileName == null) {
       System.err.println("Must specify Plan File Name");
       printUsage();
@@ -750,27 +751,29 @@
   /**
    * Handle a empty/null path for a given alias
    */
-  private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths, 
+  private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths,
                            boolean isEmptyPath, String alias) throws Exception {
     // either the directory does not exist or it is empty
     assert path == null || isEmptyPath;
 
     // The input file does not exist, replace it by a empty file
     Class<? extends HiveOutputFormat> outFileFormat = null;
- 
-    if (isEmptyPath) 
+
+    if (isEmptyPath)
       outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc().getOutputFileFormatClass();
     else
       outFileFormat = work.getAliasToPartnInfo().get(alias).getTableDesc().getOutputFileFormatClass();
-    
+
     // create a dummy empty file in a new directory
     String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
     Path newPath = new Path(newDir);
+    FileSystem fs = newPath.getFileSystem(job);
+    fs.mkdirs(newPath);
     String newFile = newDir + File.separator + "emptyFile";
     Path newFilePath = new Path(newFile);
 
     LOG.info("Changed input file to " + newPath.toString());
-    
+
     // toggle the work
     LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
     if (isEmptyPath) {
@@ -786,7 +789,7 @@
     }
 
     work.setPathToAliases(pathToAliases);
-    
+
     LinkedHashMap<String,partitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
     if (isEmptyPath) {
       pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
@@ -797,7 +800,7 @@
       pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
     }
     work.setPathToPartitionInfo(pathToPartitionInfo);
-    
+
     String onefile = newPath.toString();
     RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, new Properties(), null);
     recWriter.close(false);
@@ -815,13 +818,13 @@
       LOG.info("Processing alias " + oneAlias);
       List<String> emptyPaths     = new ArrayList<String>();
 
-      // The alias may not have any path 
+      // The alias may not have any path
       String path = null;
       for (String onefile : work.getPathToAliases().keySet()) {
         List<String> aliases = work.getPathToAliases().get(onefile);
         if (aliases.contains(oneAlias)) {
           path = onefile;
-      
+
           // Multiple aliases can point to the same path - it should be processed only once
           if (pathsProcessed.contains(path))
             continue;
@@ -849,7 +852,7 @@
         numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias);
     }
   }
-  
+
   public int getType() {
     return StageType.MAPRED;
   }