You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by zs...@apache.org on 2009/02/19 03:36:26 UTC
svn commit: r745709 - in /hadoop/hive/trunk: ./
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/metadata/
Author: zshao
Date: Thu Feb 19 02:36:25 2009
New Revision: 745709
URL: http://svn.apache.org/viewvc?rev=745709&view=rev
Log:
HIVE-131. Remove uncommitted files from failed tasks. (Joydeep Sen Sarma via zshao)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Feb 19 02:36:25 2009
@@ -127,6 +127,9 @@
BUG FIXES
+ HIVE-131. Remove uncommitted files from failed tasks.
+ (Joydeep Sen Sarma via zshao)
+
HIVE-264. TBinarySortable Protocol supports null characters. (zshao)
HIVE-255. Propagate user name to Hive metastore. (Prasad Chakka via zshao)
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Feb 19 02:36:25 2009
@@ -303,7 +303,6 @@
}
return r;
}
-
/**
* Execute a query plan using Hadoop
@@ -313,8 +312,10 @@
try {
setNumberOfReducers();
} catch(IOException e) {
- String statusMesg = "IOException occurred while acceesing HDFS to estimate the number of reducers.";
- console.printError(statusMesg);
+ String statusMesg = "IOException while accessing HDFS to estimate the number of reducers: "
+ + e.getMessage();
+ console.printError(statusMesg, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
return 1;
}
@@ -355,7 +356,8 @@
int returnVal = 0;
FileSystem fs = null;
- RunningJob rj = null;
+ RunningJob rj = null, orig_rj = null;
+ boolean success = false;
try {
fs = FileSystem.get(job);
@@ -386,7 +388,7 @@
// make this client wait if job trcker is not behaving well.
Throttle.checkJobTracker(job, LOG);
- rj = jc.submitJob(job);
+ orig_rj = rj = jc.submitJob(job);
// add to list of running jobs so in case of abnormal shutdown can kill
// it.
@@ -396,8 +398,17 @@
jobInfo(rj);
rj = jobProgress(jc, rj);
+ if(rj == null) {
+ // in the corner case where the running job has disappeared from JT memory
+ // remember that we did actually submit the job.
+ rj = orig_rj;
+ success = false;
+ } else {
+ success = rj.isSuccessful();
+ }
+
String statusMesg = "Ended Job = " + rj.getJobID();
- if (!rj.isSuccessful()) {
+ if (!success) {
statusMesg += " with errors";
returnVal = 2;
console.printError(statusMesg);
@@ -405,7 +416,7 @@
console.printInfo(statusMesg);
}
} catch (Exception e) {
- String mesg = " with exception '" + e.getMessage() + "'";
+ String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
if (rj != null) {
mesg = "Ended Job = " + rj.getJobID() + mesg;
} else {
@@ -416,6 +427,7 @@
console.printError(mesg, "\n"
+ org.apache.hadoop.util.StringUtils.stringifyException(e));
+ success = false;
returnVal = 1;
} finally {
Utilities.clearMapRedWork(job);
@@ -428,6 +440,30 @@
} catch (Exception e) {
}
}
+
+ try {
+ if (rj != null) {
+ if(work.getAliasToWork() != null) {
+ for(Operator<? extends Serializable> op:
+ work.getAliasToWork().values()) {
+ op.jobClose(job, success);
+ }
+ }
+ if(work.getReducer() != null) {
+ work.getReducer().jobClose(job, success);
+ }
+ }
+ } catch (Exception e) {
+ // jobClose needs to execute successfully otherwise fail task
+ if(success) {
+ success = false;
+ returnVal = 3;
+ String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
+ console.printError(mesg, "\n"
+ + org.apache.hadoop.util.StringUtils.stringifyException(e));
+ }
+ }
+
return (returnVal);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Feb 19 02:36:25 2009
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.*;
+import java.lang.reflect.Method;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
@@ -50,9 +51,13 @@
transient protected Path finalPath;
transient protected Serializer serializer;
transient protected BytesWritable commonKey = new BytesWritable();
-
+ transient protected boolean autoDelete = false;
+
private void commit() throws IOException {
- fs.rename(outPath, finalPath);
+ if(!fs.rename(outPath, finalPath)) {
+ throw new IOException ("Unable to rename output to: " + finalPath);
+ }
+ LOG.info("Committed to output file: " + finalPath);
}
public void close(boolean abort) throws HiveException {
@@ -68,7 +73,8 @@
} else {
try {
outWriter.close(abort);
- fs.delete(outPath, true);
+ if(!autoDelete)
+ fs.delete(outPath, true);
} catch (Exception e) {
e.printStackTrace();
}
@@ -91,8 +97,11 @@
}
fs = FileSystem.get(hconf);
- finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf));
- outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf));
+ finalPath = new Path(Utilities.toTempPath(conf.getDirName()), Utilities.getTaskId(hconf));
+ outPath = new Path(Utilities.toTempPath(conf.getDirName()), Utilities.toTempPath(Utilities.getTaskId(hconf)));
+
+ LOG.info("Writing to temp file: " + outPath);
+
OutputFormat<?, ?> outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
final Class<? extends Writable> outputClass = serializer.getSerializedClass();
boolean isCompressed = conf.getCompressed();
@@ -100,7 +109,7 @@
// The reason to keep these instead of using OutputFormat.getRecordWriter() is that
// getRecordWriter does not give us enough control over the file name that we create.
if(outputFormat instanceof IgnoreKeyTextOutputFormat) {
- finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf) +
+ finalPath = new Path(Utilities.toTempPath(conf.getDirName()), Utilities.getTaskId(hconf) +
Utilities.getFileExtension(jc, isCompressed));
String rowSeparatorString = conf.getTableInfo().getProperties().getProperty(Constants.LINE_DELIM, "\n");
@@ -145,6 +154,15 @@
// should never come here - we should be catching this in ddl command
throw new HiveException ("Illegal outputformat: " + outputFormat.getClass().getName());
}
+
+ // in recent hadoop versions, use deleteOnExit to clean tmp files.
+ try {
+ Method deleteOnExit = FileSystem.class.getDeclaredMethod("deleteOnExit", new Class [] {Path.class});
+ deleteOnExit.setAccessible(true);
+ deleteOnExit.invoke(fs, outPath);
+ autoDelete = true;
+ } catch (Exception e) {}
+
} catch (HiveException e) {
throw e;
} catch (Exception e) {
@@ -173,4 +191,33 @@
public String getName() {
return new String("FS");
}
+
+ @Override
+ public void jobClose(Configuration hconf, boolean success) throws HiveException {
+ try {
+ if(conf != null) {
+ fs = FileSystem.get(hconf);
+ Path tmpPath = Utilities.toTempPath(conf.getDirName());
+ Path finalPath = new Path(conf.getDirName());
+ if(success) {
+ if(fs.exists(tmpPath)) {
+ // Step1: rename tmp output folder to final path. After this point,
+ // updates from speculative tasks still writing to tmpPath will not
+ // appear in finalPath
+ LOG.info("Renaming tmp dir: " + tmpPath + " to: " + finalPath);
+ if(!fs.rename(tmpPath, finalPath)) {
+ throw new HiveException("Unable to commit result directory: " + finalPath);
+ }
+ // Step2: Clean any temp files from finalPath
+ Utilities.removeTempFiles(fs, finalPath);
+ }
+ } else {
+ fs.delete(tmpPath);
+ }
+ }
+ } catch (IOException e) {
+ throw new HiveException (e);
+ }
+ super.jobClose(hconf, success);
+ }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Thu Feb 19 02:36:25 2009
@@ -45,20 +45,6 @@
private static final long serialVersionUID = 1L;
- private void cleanseSource(FileSystem fs, Path sourcePath) throws IOException {
- if(sourcePath == null)
- return;
-
- FileStatus [] srcs = fs.globStatus(sourcePath);
- if(srcs != null) {
- for(FileStatus one: srcs) {
- if(Hive.needsDeletion(one)) {
- fs.delete(one.getPath(), true);
- }
- }
- }
- }
-
public int execute() {
try {
@@ -70,7 +56,6 @@
for(loadFileDesc lfd: work.getLoadFileWork()) {
Path targetPath = new Path(lfd.getTargetDir());
Path sourcePath = new Path(lfd.getSourceDir());
- cleanseSource(fs, sourcePath);
if (lfd.getIsDfsDir()) {
// Just do a rename on the URIs
String mesg = "Moving data to: " + lfd.getTargetDir();
@@ -95,11 +80,11 @@
if(dstFs.delete(targetPath, true) || !dstFs.exists(targetPath)) {
console.printInfo(mesg, mesg_detail);
- // if source exists, rename. Otherwise, create a empty directory
- if (fs.exists(sourcePath))
- fs.copyToLocalFile(sourcePath, targetPath);
- else
- dstFs.mkdirs(targetPath);
+ // if source exists, rename. Otherwise, create a empty directory
+ if (fs.exists(sourcePath))
+ fs.copyToLocalFile(sourcePath, targetPath);
+ else
+ dstFs.mkdirs(targetPath);
} else {
console.printInfo("Unable to delete the existing destination directory: " + targetPath);
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Feb 19 02:36:25 2009
@@ -266,6 +266,22 @@
}
}
+ /**
+ * Unlike other operator interfaces which are called from map or reduce task,
+ * jobClose is called from the jobclient side once the job has completed
+ *
+ * @param conf Configuration with with which job was submitted
+ * @param succes whether the job was completed successfully or not
+ */
+ public void jobClose(Configuration conf, boolean success) throws HiveException {
+ if(childOperators == null)
+ return;
+
+ for(Operator<? extends Serializable> op: childOperators) {
+ op.jobClose(conf, success);
+ }
+ }
+
protected void forward(Object row, ObjectInspector rowInspector) throws HiveException {
if((childOperators == null) || (getDone())) {
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Feb 19 02:36:25 2009
@@ -30,6 +30,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -540,4 +541,54 @@
return src;
}
+
+ private static final String tmpPrefix = "_tmp.";
+
+ public static Path toTempPath(Path orig) {
+ if(orig.getName().indexOf(tmpPrefix) == 0)
+ return orig;
+ return new Path(orig.getParent(), tmpPrefix + orig.getName());
+ }
+
+ /**
+ * Given a path, convert to a temporary path
+ */
+ public static Path toTempPath(String orig) {
+ return toTempPath(new Path(orig));
+ }
+
+ /**
+ * Detect if the supplied file is a temporary path
+ */
+ public static boolean isTempPath(FileStatus file) {
+ String name = file.getPath().getName();
+ // in addition to detecting hive temporary files, we also check hadoop
+ // temporary folders that used to show up in older releases
+ return (name.startsWith("_task") || name.startsWith(tmpPrefix));
+ }
+
+ /**
+ * Remove all temporary files from a given directory
+ */
+ public static void removeTempFiles(FileSystem fs, Path path) throws IOException {
+ if(path == null)
+ return;
+
+ FileStatus items[] = fs.listStatus(path);
+ if(items == null)
+ return;
+
+ for(FileStatus one: items) {
+ if(isTempPath(one)) {
+ if(!fs.delete(one.getPath(), true)) {
+ throw new IOException ("Unable to delete tmp file: " + one.getPath());
+ }
+ }
+ }
+ }
+
+ public static String getNameMessage(Exception e) {
+ return e.getClass().getName() + "(" + e.getMessage() + ")";
+ }
+
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=745709&r1=745708&r2=745709&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Thu Feb 19 02:36:25 2009
@@ -53,6 +53,7 @@
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import com.facebook.thrift.TException;
import com.facebook.thrift.protocol.TBinaryProtocol;
@@ -660,26 +661,15 @@
}
}
- public static boolean needsDeletion(FileStatus file) {
- String name = file.getPath().getName();
- // There is a race condition in hadoop as a result of which
- // the _task files created in the output directory at the time
- // of the mapper is reported in the output directory even though
- // it is actually removed. The first check works around that
- // NOTE: it's not clear that this still affects recent versions of hadoop
-
- // the second check deals with uncommitted output files produced by hive tasks
- // this would typically happen on task failures or due to speculation
- return (name.startsWith("_task") || name.startsWith("_tmp."));
- }
-
private void checkPaths(FileSystem fs, FileStatus [] srcs, Path destf, boolean replace) throws HiveException {
try {
for(int i=0; i<srcs.length; i++) {
FileStatus [] items = fs.listStatus(srcs[i].getPath());
for(int j=0; j<items.length; j++) {
- if (needsDeletion(items[j])) {
+ if (Utilities.isTempPath(items[j])) {
+ // This check is redundant because temp files are removed by execution layer before
+ // calling loadTable/Partition. But leaving it in just in case.
fs.delete(items[j].getPath(), true);
continue;
}
@@ -809,6 +799,4 @@
throw new HiveException("Error in getting fields from serde." + e.getMessage(), e);
}
}
-
-
};