You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/10/15 08:51:36 UTC
svn commit: r825412 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
Author: zshao
Date: Thu Oct 15 06:51:36 2009
New Revision: 825412
URL: http://svn.apache.org/viewvc?rev=825412&view=rev
Log:
HIVE-879. Fix null pointer for empty dir. (Namit Jain via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=825412&r1=825411&r2=825412&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Oct 15 06:51:36 2009
@@ -183,6 +183,8 @@
HIVE-877. More test cases for UDFOPNegative. (Ning Zhang via zshao)
+ HIVE-879. Fix null pointer for empty dir. (Namit Jain via zshao)
+
Release 0.4.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=825412&r1=825411&r2=825412&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Oct 15 06:51:36 2009
@@ -53,6 +53,7 @@
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.varia.NullAppender;
import java.lang.ClassNotFoundException;
+import org.apache.hadoop.hive.common.FileUtils;
public class ExecDriver extends Task<mapredWork> implements Serializable {
@@ -144,10 +145,10 @@
/**
* In Hive, when the user control-c's the command line, any running jobs
* spawned from that command line are best-effort killed.
- *
+ *
* This static constructor registers a shutdown thread to iterate over all the
* running job kill URLs and do a get on them.
- *
+ *
*/
static {
if (new org.apache.hadoop.conf.Configuration().getBoolean(
@@ -195,7 +196,7 @@
+ " job -Dmapred.job.tracker=" + hp + " -kill " + rj.getJobID());
}
}
-
+
/**
* This class contains the state of the running task
* Going forward, we will return this handle from execute
@@ -223,7 +224,7 @@
}
public void progress(TaskHandle taskHandle) throws IOException {
- ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle;
+ ExecDriverTaskHandle th = (ExecDriverTaskHandle)taskHandle;
JobClient jc = th.getJobClient();
RunningJob rj = th.getRunningJob();
String lastReport = "";
@@ -238,7 +239,7 @@
}
th.setRunningJob(jc.getJob(rj.getJobID()));
updateCounters(th);
-
+
String report = " map = " + this.mapProgress + "%, reduce = " + this.reduceProgress + "%";
if (!report.equals(lastReport)
@@ -286,13 +287,13 @@
int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
long totalInputFileSize = getTotalInputFileSize(job, work);
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
+ " totalInputFileSize=" + totalInputFileSize);
int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
- return reducers;
+ return reducers;
}
/**
@@ -301,13 +302,13 @@
protected void setNumberOfReducers() throws IOException {
// this is a temporary hack to fix things that are not fixed in the compiler
Integer numReducersFromWork = work.getNumReduceTasks();
-
+
if(work.getReducer() == null) {
console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
work.setNumReduceTasks(Integer.valueOf(0));
} else {
if (numReducersFromWork >= 0) {
- console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
+ console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
} else if (job.getNumReduceTasks() > 0) {
int reducers = job.getNumReduceTasks();
work.setNumReduceTasks(reducers);
@@ -331,7 +332,7 @@
* Calculate the total size of input files.
* @param job the hadoop job conf.
* @return the total size in bytes.
- * @throws IOException
+ * @throws IOException
*/
public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {
long r = 0;
@@ -385,7 +386,7 @@
return reduceProgress == 100;
}
-
+
/**
* Execute a query plan using Hadoop
*/
@@ -437,7 +438,7 @@
String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS);
if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) {
- String allJars =
+ String allJars =
StringUtils.isNotBlank(auxJars)
? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
: addedJars;
@@ -455,7 +456,7 @@
if (StringUtils.isNotBlank(addedArchives)) {
initializeFiles("tmparchives", addedArchives);
}
-
+
int returnVal = 0;
RunningJob rj = null, orig_rj = null;
boolean success = false;
@@ -469,7 +470,7 @@
if (pwd != null)
job.set(HiveConf.ConfVars.METASTOREPWD.varname, "HIVE");
JobClient jc = new JobClient(job);
-
+
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
@@ -597,7 +598,7 @@
BasicConfigurator.resetConfiguration();
BasicConfigurator.configure(new NullAppender());
}
-
+
if (planFileName == null) {
System.err.println("Must specify Plan File Name");
printUsage();
@@ -750,27 +751,29 @@
/**
* Handle a empty/null path for a given alias
*/
- private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths,
+ private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths,
boolean isEmptyPath, String alias) throws Exception {
// either the directory does not exist or it is empty
assert path == null || isEmptyPath;
// The input file does not exist, replace it by a empty file
Class<? extends HiveOutputFormat> outFileFormat = null;
-
- if (isEmptyPath)
+
+ if (isEmptyPath)
outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc().getOutputFileFormatClass();
else
outFileFormat = work.getAliasToPartnInfo().get(alias).getTableDesc().getOutputFileFormatClass();
-
+
// create a dummy empty file in a new directory
String newDir = hiveScratchDir + File.separator + (++numEmptyPaths);
Path newPath = new Path(newDir);
+ FileSystem fs = newPath.getFileSystem(job);
+ fs.mkdirs(newPath);
String newFile = newDir + File.separator + "emptyFile";
Path newFilePath = new Path(newFile);
LOG.info("Changed input file to " + newPath.toString());
-
+
// toggle the work
LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
if (isEmptyPath) {
@@ -786,7 +789,7 @@
}
work.setPathToAliases(pathToAliases);
-
+
LinkedHashMap<String,partitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
if (isEmptyPath) {
pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
@@ -797,7 +800,7 @@
pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
}
work.setPathToPartitionInfo(pathToPartitionInfo);
-
+
String onefile = newPath.toString();
RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath, Text.class, false, new Properties(), null);
recWriter.close(false);
@@ -815,13 +818,13 @@
LOG.info("Processing alias " + oneAlias);
List<String> emptyPaths = new ArrayList<String>();
- // The alias may not have any path
+ // The alias may not have any path
String path = null;
for (String onefile : work.getPathToAliases().keySet()) {
List<String> aliases = work.getPathToAliases().get(onefile);
if (aliases.contains(oneAlias)) {
path = onefile;
-
+
// Multiple aliases can point to the same path - it should be processed only once
if (pathsProcessed.contains(path))
continue;
@@ -849,7 +852,7 @@
numEmptyPaths = addInputPath(null, job, work, hiveScratchDir, numEmptyPaths, false, oneAlias);
}
}
-
+
public int getType() {
return StageType.MAPRED;
}