You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2009/06/29 22:43:04 UTC
svn commit: r789440 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Author: namit
Date: Mon Jun 29 20:43:04 2009
New Revision: 789440
URL: http://svn.apache.org/viewvc?rev=789440&view=rev
Log:
HIVE-587. Duplicate results from multiple TIP
(Zheng Shao via namit)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Jun 29 20:43:04 2009
@@ -279,6 +279,9 @@
HIVE-583. Fix spurious 'default database not found'.
(Prasad Chakka via zshao)
+ HIVE-587. Duplicate results from multiple TIP
+ (Zheng Shao via namit)
+
Release 0.3.1 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Mon Jun 29 20:43:04 2009
@@ -206,16 +206,20 @@
String specPath = conf.getDirName();
fs = (new Path(specPath)).getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
+ Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName() + ".intermediate");
Path finalPath = new Path(specPath);
if(success) {
if(fs.exists(tmpPath)) {
- // Step1: rename tmp output folder to final path. After this point,
- // updates from speculative tasks still writing to tmpPath will not
- // appear in finalPath
- LOG.info("Moving tmp dir: " + tmpPath + " to: " + finalPath);
- renameOrMoveFiles(fs, tmpPath, finalPath);
- // Step2: Clean any temp files from finalPath
- Utilities.removeTempFiles(fs, finalPath);
+ // Step1: rename tmp output folder to intermediate path. After this
+ // point, updates from speculative tasks still writing to tmpPath
+ // will not appear in finalPath.
+ LOG.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+ Utilities.rename(fs, tmpPath, intermediatePath);
+ // Step2: remove any tmp file or double-committed output files
+ Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+ // Step3: move to the file destination
+ LOG.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
+ Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
}
} else {
fs.delete(tmpPath);
@@ -227,39 +231,4 @@
super.jobClose(hconf, success);
}
- /**
- * Rename src to dst, or in the case dst already exists, move files in src
- * to dst. If there is an existing file with the same name, the new file's
- * name will be appended with "_1", "_2", etc.
- * @param fs the FileSystem where src and dst are on.
- * @param src the src directory
- * @param dst the target directory
- * @throws IOException
- */
- static public void renameOrMoveFiles(FileSystem fs, Path src, Path dst)
- throws IOException, HiveException {
- if (!fs.exists(dst)) {
- if (!fs.rename(src, dst)) {
- throw new HiveException ("Unable to move: " + src + " to: " + dst);
- }
- } else {
- // move file by file
- FileStatus[] files = fs.listStatus(src);
- for (int i=0; i<files.length; i++) {
- Path srcFilePath = files[i].getPath();
- String fileName = srcFilePath.getName();
- Path dstFilePath = new Path(dst, fileName);
- if (fs.exists(dstFilePath)) {
- int suffix = 0;
- do {
- suffix++;
- dstFilePath = new Path(dst, fileName + "_" + suffix);
- } while (fs.exists(dstFilePath));
- }
- if (!fs.rename(srcFilePath, dstFilePath)) {
- throw new HiveException ("Unable to move: " + src + " to: " + dst);
- }
- }
- }
- }
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Mon Jun 29 20:43:04 2009
@@ -267,6 +267,9 @@
boolean new_abort = abort;
if(!abort) {
+ if(scriptError != null) {
+ throw new HiveException(scriptError);
+ }
// everything ok. try normal shutdown
try {
scriptOut.flush();
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=789440&r1=789439&r2=789440&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Jun 29 20:43:04 2009
@@ -24,6 +24,8 @@
import java.net.URL;
import java.net.URLClassLoader;
import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.beans.*;
import org.apache.commons.lang.StringUtils;
@@ -50,22 +52,9 @@
import org.apache.hadoop.hive.ql.plan.*;
import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.metadata.Partition;
-import org.apache.hadoop.hive.serde2.io.ByteWritable;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.ShortWritable;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -82,7 +71,7 @@
public static enum ReduceField { KEY, VALUE, ALIAS };
private static volatile mapredWork gWork = null;
- static final private Log LOG = LogFactory.getLog("hive.ql.exec.Utilities");
+ static final private Log LOG = LogFactory.getLog(Utilities.class.getName());
public static void clearMapRedWork (Configuration job) {
try {
@@ -619,9 +608,81 @@
}
/**
- * Remove all temporary files from a given directory
+ * Rename src to dst, or in the case dst already exists, move files in src
+ * to dst. If there is an existing file with the same name, the new file's
+ * name will be appended with "_1", "_2", etc.
+ * @param fs the FileSystem where src and dst are on.
+ * @param src the src directory
+ * @param dst the target directory
+ * @throws IOException
+ */
+ static public void rename(FileSystem fs, Path src, Path dst)
+ throws IOException, HiveException {
+ if (!fs.rename(src, dst)) {
+ throw new HiveException ("Unable to move: " + src + " to: " + dst);
+ }
+ }
+ /**
+ * Rename src to dst, or in the case dst already exists, move files in src
+ * to dst. If there is an existing file with the same name, the new file's
+ * name will be appended with "_1", "_2", etc.
+ * @param fs the FileSystem where src and dst are on.
+ * @param src the src directory
+ * @param dst the target directory
+ * @throws IOException
+ */
+ static public void renameOrMoveFiles(FileSystem fs, Path src, Path dst)
+ throws IOException, HiveException {
+ if (!fs.exists(dst)) {
+ if (!fs.rename(src, dst)) {
+ throw new HiveException ("Unable to move: " + src + " to: " + dst);
+ }
+ } else {
+ // move file by file
+ FileStatus[] files = fs.listStatus(src);
+ for (int i=0; i<files.length; i++) {
+ Path srcFilePath = files[i].getPath();
+ String fileName = srcFilePath.getName();
+ Path dstFilePath = new Path(dst, fileName);
+ if (fs.exists(dstFilePath)) {
+ int suffix = 0;
+ do {
+ suffix++;
+ dstFilePath = new Path(dst, fileName + "_" + suffix);
+ } while (fs.exists(dstFilePath));
+ }
+ if (!fs.rename(srcFilePath, dstFilePath)) {
+ throw new HiveException ("Unable to move: " + src + " to: " + dst);
+ }
+ }
+ }
+ }
+
+ /** The first group will contain the task id.
+ * The second group is the optional extension.
+ * The file name looks like: "24931_r_000000_0" or "24931_r_000000_0.gz"
+ */
+ static Pattern fileNameTaskIdRegex = Pattern.compile("^.*_([0-9]*)_[0-9](\\..*)?$");
+
+ /**
+ * Get the task id from the filename.
+ * E.g., get "000000" out of "24931_r_000000_0" or "24931_r_000000_0.gz"
+ */
+ public static String getTaskIdFromFilename(String filename) {
+ Matcher m = fileNameTaskIdRegex.matcher(filename);
+ if (!m.matches()) {
+ LOG.warn("Unable to get task id from file name: " + filename + ". Using full filename as task id.");
+ return filename;
+ } else {
+ String taskId = m.group(1);
+ LOG.debug("TaskId for " + filename + " = " + taskId);
+ return taskId;
+ }
+ }
+ /**
+ * Remove all temporary files and duplicate (double-committed) files from a given directory.
*/
- public static void removeTempFiles(FileSystem fs, Path path) throws IOException {
+ public static void removeTempOrDuplicateFiles(FileSystem fs, Path path) throws IOException {
if(path == null)
return;
@@ -629,12 +690,26 @@
if(items == null)
return;
+ HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
for(FileStatus one: items) {
if(isTempPath(one)) {
if(!fs.delete(one.getPath(), true)) {
throw new IOException ("Unable to delete tmp file: " + one.getPath());
}
}
+ String taskId = getTaskIdFromFilename(one.getPath().getName());
+ FileStatus otherFile = taskIdToFile.get(taskId);
+ if (otherFile == null) {
+ taskIdToFile.put(taskId, one);
+ } else {
+ if(!fs.delete(one.getPath(), true)) {
+ throw new IOException ("Unable to delete duplicate file: "
+ + one.getPath() + ". Existing file: " + otherFile.getPath());
+ } else {
+ LOG.warn("Duplicate taskid file removed: " + one.getPath()
+ + ". Existing file: " + otherFile.getPath());
+ }
+ }
}
}