You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/09/09 19:36:24 UTC
svn commit: r693524 - in /hadoop/core/trunk: ./
src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/
src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/
src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/
Author: dhruba
Date: Tue Sep 9 10:36:23 2008
New Revision: 693524
URL: http://svn.apache.org/viewvc?rev=693524&view=rev
Log:
HADOOP-4097. Make hive work well with speculative execution turned on.
(Joydeep Sen Sarma via dhruba)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Sep 9 10:36:23 2008
@@ -501,6 +501,9 @@
implementations and moves it to the JobTracker.
(Amareshwari Sriramadasu via ddas)
+ HADOOP-4097. Make hive work well with speculative execution turned on.
+ (Joydeep Sen Sarma via dhruba)
+
Release 0.18.1 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Sep 9 10:36:23 2008
@@ -81,7 +81,7 @@
try {
fs = FileSystem.get(hconf);
finalPath = new Path(conf.getDirName(), Utilities.getTaskId(hconf));
- outPath = new Path(conf.getDirName(), "tmp."+Utilities.getTaskId(hconf));
+ outPath = new Path(conf.getDirName(), "_tmp."+Utilities.getTaskId(hconf));
OutputFormat outputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
if(outputFormat instanceof IgnoreKeyTextOutputFormat) {
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Sep 9 10:36:23 2008
@@ -19,13 +19,17 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
+import java.io.IOException;
+
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.plan.loadFileDesc;
import org.apache.hadoop.hive.ql.plan.loadTableDesc;
import org.apache.hadoop.hive.ql.plan.moveWork;
+import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.util.StringUtils;
/**
@@ -35,6 +39,20 @@
private static final long serialVersionUID = 1L;
+ private void cleanseSource(FileSystem fs, Path sourcePath) throws IOException {
+ if(sourcePath == null)
+ return;
+
+ FileStatus [] srcs = fs.globStatus(sourcePath);
+ if(srcs != null) {
+ for(FileStatus one: srcs) {
+ if(Hive.needsDeletion(one)) {
+ fs.delete(one.getPath(), true);
+ }
+ }
+ }
+ }
+
public int execute() {
try {
@@ -46,6 +64,7 @@
for(loadFileDesc lfd: work.getLoadFileWork()) {
Path targetPath = new Path(lfd.getTargetDir());
Path sourcePath = new Path(lfd.getSourceDir());
+ cleanseSource(fs, sourcePath);
if (lfd.getIsDfsDir()) {
// Just do a rename on the URIs
String mesg = "Moving data to: " + lfd.getTargetDir();
Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Tue Sep 9 10:36:23 2008
@@ -502,17 +502,28 @@
}
}
+ public static boolean needsDeletion(FileStatus file) {
+ String name = file.getPath().getName();
+ // There is a race condition in hadoop as a result of which
+ // the _task files created in the output directory at the time
+ // of the mapper is reported in the output directory even though
+ // it is actually removed. The first check works around that
+ // NOTE: it's not clear that this still affects recent versions of hadoop
+
+ // the second check deals with uncommitted output files produced by hive tasks
+ // this would typically happen on task failures or due to speculation
+ return (name.startsWith("_task") || name.startsWith("_tmp."));
+ }
+
private void checkPaths(FileSystem fs, FileStatus [] srcs, Path destf, boolean replace) throws HiveException {
try {
for(int i=0; i<srcs.length; i++) {
FileStatus [] items = fs.listStatus(srcs[i].getPath());
for(int j=0; j<items.length; j++) {
- // There is a race condition in hadoop as a result of which
- // the _task files created in the output directory at the time
- // of the mapper is reported in the output directory even though
- // it is actually removed. The following check works around that
- if (items[j].getPath().getName().startsWith("_task")) {
- continue;
+
+ if (needsDeletion(items[j])) {
+ fs.delete(items[j].getPath(), true);
+ continue;
}
if(items[j].isDir()) {
throw new HiveException("checkPaths: "+srcs[i].toString()+" has nested directory"+
@@ -584,9 +595,6 @@
for(int i=0; i<srcs.length; i++) {
FileStatus[] items = fs.listStatus(srcs[i].getPath());
for(int j=0; j<items.length; j++) {
- if (items[j].getPath().getName().startsWith("_task")) {
- continue;
- }
boolean b = fs.rename(items[j].getPath(), new Path(tmppath, items[j].getPath().getName()));
LOG.debug("Renaming:"+items[j]+",Status:"+b);
}
Modified: hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java?rev=693524&r1=693523&r2=693524&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java (original)
+++ hadoop/core/trunk/src/contrib/hive/serde/src/java/org/apache/hadoop/hive/serde/simple_meta/MetadataTypedSerDeField.java Tue Sep 9 10:36:23 2008
@@ -65,7 +65,7 @@
ColumnSet temp = (ColumnSet)obj;
if(temp.col.size() <= _position) {
- System.err.println("get " + temp.col.size() + "<=" + _position);
+ //System.err.println("get " + temp.col.size() + "<=" + _position);
return null;
}
try {