You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/10/16 21:04:23 UTC
svn commit: r826039 - in /hadoop/hive/branches/branch-0.4: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
Author: zshao
Date: Fri Oct 16 19:04:23 2009
New Revision: 826039
URL: http://svn.apache.org/viewvc?rev=826039&view=rev
Log:
HIVE-882. Create a new directory every time for scratch. (Namit Jain via zshao)
Modified:
hadoop/hive/branches/branch-0.4/CHANGES.txt
hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
Modified: hadoop/hive/branches/branch-0.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.4/CHANGES.txt?rev=826039&r1=826038&r2=826039&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.4/CHANGES.txt (original)
+++ hadoop/hive/branches/branch-0.4/CHANGES.txt Fri Oct 16 19:04:23 2009
@@ -570,6 +570,9 @@
HIVE-878. Update the hash table entry before flushing in Group By
hash aggregation (Zheng Shao via namit)
+ HIVE-882. Create a new directory every time for scratch.
+ (Namit Jain via zshao)
+
Release 0.3.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=826039&r1=826038&r2=826039&view=diff
==============================================================================
--- hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/branches/branch-0.4/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Oct 16 19:04:23 2009
@@ -140,10 +140,10 @@
/**
* In Hive, when the user control-c's the command line, any running jobs
* spawned from that command line are best-effort killed.
- *
+ *
* This static constructor registers a shutdown thread to iterate over all the
* running job kill URLs and do a get on them.
- *
+ *
*/
static {
if (new org.apache.hadoop.conf.Configuration().getBoolean(
@@ -245,13 +245,13 @@
int maxReducers = hive.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
long totalInputFileSize = getTotalInputFileSize(job, work);
- LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
+ LOG.info("BytesPerReducer=" + bytesPerReducer + " maxReducers=" + maxReducers
+ " totalInputFileSize=" + totalInputFileSize);
int reducers = (int)((totalInputFileSize + bytesPerReducer - 1) / bytesPerReducer);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
- return reducers;
+ return reducers;
}
/**
@@ -260,13 +260,13 @@
protected void setNumberOfReducers() throws IOException {
// this is a temporary hack to fix things that are not fixed in the compiler
Integer numReducersFromWork = work.getNumReduceTasks();
-
+
if(work.getReducer() == null) {
console.printInfo("Number of reduce tasks is set to 0 since there's no reduce operator");
work.setNumReduceTasks(Integer.valueOf(0));
} else {
if (numReducersFromWork >= 0) {
- console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
+ console.printInfo("Number of reduce tasks determined at compile time: " + work.getNumReduceTasks());
} else if (job.getNumReduceTasks() > 0) {
int reducers = job.getNumReduceTasks();
work.setNumReduceTasks(reducers);
@@ -290,7 +290,7 @@
* Calculate the total size of input files.
* @param job the hadoop job conf.
* @return the total size in bytes.
- * @throws IOException
+ * @throws IOException
*/
public long getTotalInputFileSize(JobConf job, mapredWork work) throws IOException {
long r = 0;
@@ -330,7 +330,28 @@
String hiveScratchDir = HiveConf.getVar(job, HiveConf.ConfVars.SCRATCHDIR);
- Path jobScratchDir = new Path(hiveScratchDir + Utilities.randGen.nextInt());
+ String jobScratchDirStr = hiveScratchDir + File.separator + Utilities.randGen.nextInt();
+ Path jobScratchDir = new Path(jobScratchDirStr);
+ String emptyScratchDirStr = null;
+ Path emptyScratchDir = null;
+
+ int numTries = 3;
+ while (numTries > 0) {
+ emptyScratchDirStr = hiveScratchDir + File.separator + Utilities.randGen.nextInt();
+ emptyScratchDir = new Path(emptyScratchDirStr);
+
+ try {
+ FileSystem fs = emptyScratchDir.getFileSystem(job);
+ fs.mkdirs(emptyScratchDir);
+ break;
+ } catch (Exception e) {
+ if (numTries > 0)
+ numTries--;
+ else
+ throw new RuntimeException("Failed to make dir " + emptyScratchDir.toString() + " : " + e.getMessage());
+ }
+ }
+
FileOutputFormat.setOutputPath(job, jobScratchDir);
job.setMapperClass(ExecMapper.class);
@@ -350,7 +371,7 @@
String auxJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEAUXJARS);
String addedJars = HiveConf.getVar(job, HiveConf.ConfVars.HIVEADDEDJARS);
if (StringUtils.isNotBlank(auxJars) || StringUtils.isNotBlank(addedJars)) {
- String allJars =
+ String allJars =
StringUtils.isNotBlank(auxJars)
? (StringUtils.isNotBlank(addedJars) ? addedJars + "," + auxJars : auxJars)
: addedJars;
@@ -363,13 +384,13 @@
if (StringUtils.isNotBlank(addedFiles)) {
initializeFiles("tmpfiles", addedFiles);
}
-
+
int returnVal = 0;
RunningJob rj = null, orig_rj = null;
boolean success = false;
try {
- addInputPaths(job, work, hiveScratchDir);
+ addInputPaths(job, work, emptyScratchDirStr);
Utilities.setMapRedWork(job, work);
// remove the pwd from conf file so that job tracker doesn't show this logs
@@ -430,6 +451,7 @@
try {
FileSystem fs = jobScratchDir.getFileSystem(job);
fs.delete(jobScratchDir, true);
+ fs.delete(emptyScratchDir, true);
if (returnVal != 0 && rj != null) {
rj.killJob();
}
@@ -503,7 +525,7 @@
BasicConfigurator.resetConfiguration();
BasicConfigurator.configure(new NullAppender());
}
-
+
if (planFileName == null) {
System.err.println("Must specify Plan File Name");
printUsage();
@@ -656,23 +678,23 @@
* Handle a empty/null path for a given alias
*/
private int addInputPath(String path, JobConf job, mapredWork work, String hiveScratchDir, int numEmptyPaths, boolean isEmptyPath,
- String alias)
+ String alias)
throws Exception {
// either the directory does not exist or it is empty
assert path == null || isEmptyPath;
// The input file does not exist, replace it by a empty file
Class<? extends HiveOutputFormat> outFileFormat = null;
-
- if (isEmptyPath)
+
+ if (isEmptyPath)
outFileFormat = work.getPathToPartitionInfo().get(path).getTableDesc().getOutputFileFormatClass();
else
outFileFormat = (Class<? extends HiveOutputFormat>)(HiveSequenceFileOutputFormat.class);
-
+
String newFile = hiveScratchDir + File.separator + (++numEmptyPaths);
Path newPath = new Path(newFile);
LOG.info("Changed input file to " + newPath.toString());
-
+
// toggle the work
LinkedHashMap<String, ArrayList<String>> pathToAliases = work.getPathToAliases();
if (isEmptyPath) {
@@ -688,7 +710,7 @@
}
work.setPathToAliases(pathToAliases);
-
+
LinkedHashMap<String,partitionDesc> pathToPartitionInfo = work.getPathToPartitionInfo();
if (isEmptyPath) {
pathToPartitionInfo.put(newPath.toUri().toString(), pathToPartitionInfo.get(path));
@@ -701,7 +723,7 @@
pathToPartitionInfo.put(newPath.toUri().toString(), pDesc);
}
work.setPathToPartitionInfo(pathToPartitionInfo);
-
+
String onefile = newPath.toString();
RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newPath, Text.class, false, new Properties(), null);
recWriter.close(false);
@@ -719,13 +741,13 @@
LOG.info("Processing alias " + oneAlias);
List<String> emptyPaths = new ArrayList<String>();
- // The alias may not have any path
+ // The alias may not have any path
String path = null;
for (String onefile : work.getPathToAliases().keySet()) {
List<String> aliases = work.getPathToAliases().get(onefile);
if (aliases.contains(oneAlias)) {
path = onefile;
-
+
// Multiple aliases can point to the same path - it should be processed only once
if (pathsProcessed.contains(path))
continue;