You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by he...@apache.org on 2011/07/21 08:46:20 UTC
svn commit: r1149047 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
exec/ExecDriver.java exec/FileSinkOperator.java exec/Utilities.java
io/RCFileOutputFormat.java io/rcfile/merge/BlockMergeTask.java
io/rcfile/merge/RCFileMergeMapper.java
Author: heyongqiang
Date: Thu Jul 21 06:46:17 2011
New Revision: 1149047
URL: http://svn.apache.org/viewvc?rev=1149047&view=rev
Log:
HIVE-2201:reduce name node calls in hive by creating temporary directories (Siying Dong via He Yongqiang)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Jul 21 06:46:17 2011
@@ -108,6 +108,7 @@ public class ExecDriver extends Task<Map
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, this);
}
+ @Override
public boolean requireLock() {
return true;
}
@@ -205,6 +206,44 @@ public class ExecDriver extends Task<Map
return false;
}
+ protected void createTmpDirs() throws IOException {
+ // fix up outputs
+ Map<String, ArrayList<String>> pa = work.getPathToAliases();
+ if (pa != null) {
+ ArrayList<Operator<? extends Serializable>> opList = new ArrayList<Operator<? extends Serializable>>();
+
+ if (work.getReducer() != null) {
+ opList.add(work.getReducer());
+ }
+
+ for (List<String> ls : pa.values()) {
+ for (String a : ls) {
+ opList.add(work.getAliasToWork().get(a));
+
+ while (!opList.isEmpty()) {
+ Operator<? extends Serializable> op = opList.remove(0);
+
+ if (op instanceof FileSinkOperator) {
+ FileSinkDesc fdesc = ((FileSinkOperator) op).getConf();
+ String tempDir = fdesc.getDirName();
+
+ if (tempDir != null) {
+ Path tempPath = Utilities.toTempPath(new Path(tempDir));
+ LOG.info("Making Temp Directory: " + tempDir);
+ FileSystem fs = tempPath.getFileSystem(job);
+ fs.mkdirs(tempPath);
+ }
+ }
+
+ if (op.getChildOperators() != null) {
+ opList.addAll(op.getChildOperators());
+ }
+ }
+ }
+ }
+ }
+ }
+
/**
* Execute a query plan using Hadoop.
*/
@@ -405,6 +444,8 @@ public class ExecDriver extends Task<Map
}
}
+ this.createTmpDirs();
+
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
// replace it back
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Jul 21 06:46:17 2011
@@ -52,10 +52,10 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -100,6 +100,7 @@ public class FileSinkOperator extends Te
public class FSPaths implements Cloneable {
Path tmpPath;
+ Path taskOutputTempPath;
Path[] outPaths;
Path[] finalPaths;
RecordWriter[] outWriters;
@@ -110,6 +111,7 @@ public class FileSinkOperator extends Te
public FSPaths(Path specPath) {
tmpPath = Utilities.toTempPath(specPath);
+ taskOutputTempPath = Utilities.toTaskTempPath(specPath);
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
outWriters = new RecordWriter[numFiles];
@@ -129,6 +131,14 @@ public class FileSinkOperator extends Te
/**
* Update OutPath according to tmpPath.
*/
+ public Path getTaskOutPath(String taskId) {
+ return getOutPath(taskId, this.taskOutputTempPath);
+ }
+
+
+ /**
+ * Update OutPath according to tmpPath.
+ */
public Path getOutPath(String taskId) {
return getOutPath(taskId, this.tmpPath);
}
@@ -182,14 +192,17 @@ public class FileSinkOperator extends Te
private void commit(FileSystem fs) throws HiveException {
for (int idx = 0; idx < outPaths.length; ++idx) {
try {
+ if (bDynParts && !fs.exists(finalPaths[idx].getParent())) {
+ fs.mkdirs(finalPaths[idx].getParent());
+ }
if (!fs.rename(outPaths[idx], finalPaths[idx])) {
- throw new HiveException("Unable to rename output to: "
- + finalPaths[idx]);
+ throw new HiveException("Unable to rename output from: " +
+ outPaths[idx] + " to: " + finalPaths[idx]);
}
updateProgress();
} catch (IOException e) {
- throw new HiveException(e + "Unable to rename output to: "
- + finalPaths[idx]);
+ throw new HiveException("Unable to rename output from: " +
+ outPaths[idx] + " to: " + finalPaths[idx], e);
}
}
}
@@ -425,7 +438,7 @@ public class FileSinkOperator extends Te
if (isNativeTable) {
fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId);
LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
- fsp.outPaths[filesIdx] = fsp.getOutPath(taskId);
+ fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
} else {
fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
@@ -616,6 +629,7 @@ public class FileSinkOperator extends Te
}
fsp2 = new FSPaths(specPath);
fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir);
+ fsp2.taskOutputTempPath = new Path(fsp2.taskOutputTempPath, dpDir);
createBucketFiles(fsp2);
valToPaths.put(dpDir, fsp2);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Jul 21 06:46:17 2011
@@ -117,8 +117,8 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils.ExpressionTypes;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
@@ -133,8 +133,8 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileOutputFormat;
@@ -1012,6 +1012,18 @@ public final class Utilities {
}
private static final String tmpPrefix = "_tmp.";
+ private static final String taskTmpPrefix = "_task_tmp.";
+
+ public static Path toTaskTempPath(Path orig) {
+ if (orig.getName().indexOf(taskTmpPrefix) == 0) {
+ return orig;
+ }
+ return new Path(orig.getParent(), taskTmpPrefix + orig.getName());
+ }
+
+ public static Path toTaskTempPath(String orig) {
+ return toTaskTempPath(new Path(orig));
+ }
public static Path toTempPath(Path orig) {
if (orig.getName().indexOf(tmpPrefix) == 0) {
@@ -1211,6 +1223,7 @@ public final class Utilities {
FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
+ Path taskTmpPath = Utilities.toTaskTempPath(specPath);
Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
+ ".intermediate");
Path finalPath = new Path(specPath);
@@ -1236,6 +1249,7 @@ public final class Utilities {
} else {
fs.delete(tmpPath, true);
}
+ fs.delete(taskTmpPath, true);
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Thu Jul 21 06:46:17 2011
@@ -77,9 +77,6 @@ public class RCFileOutputFormat extends
Path outputPath = getWorkOutputPath(job);
FileSystem fs = outputPath.getFileSystem(job);
- if (!fs.exists(outputPath)) {
- fs.mkdirs(outputPath);
- }
Path file = new Path(outputPath, name);
CompressionCodec codec = null;
if (getCompressOutput(job)) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/BlockMergeTask.java Thu Jul 21 06:46:17 2011
@@ -75,6 +75,7 @@ public class BlockMergeTask extends Task
jobExecHelper = new HadoopJobExecHelper(job, this.console, this, this);
}
+ @Override
public boolean requireLock() {
return true;
}
@@ -140,6 +141,17 @@ public class BlockMergeTask extends Task
}
String outputPath = this.work.getOutputDir();
+ Path tempOutPath = Utilities.toTempPath(new Path(outputPath));
+ try {
+ FileSystem fs = tempOutPath.getFileSystem(job);
+ if (!fs.exists(tempOutPath)) {
+ fs.mkdirs(tempOutPath);
+ }
+ } catch (IOException e) {
+ console.printError("Can't make path " + outputPath + " : " + e.getMessage());
+ return 6;
+ }
+
RCFileBlockMergeOutputFormat.setMergeOutputPath(job, new Path(outputPath));
job.setOutputKeyClass(NullWritable.class);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java?rev=1149047&r1=1149046&r2=1149047&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileMergeMapper.java Thu Jul 21 06:46:17 2011
@@ -61,6 +61,7 @@ public class RCFileMergeMapper extends M
boolean hasDynamicPartitions = false;
boolean tmpPathFixed = false;
Path tmpPath;
+ Path taskTmpPath;
Path dpPath;
public final static Log LOG = LogFactory.getLog("RCFileMergeMapper");
@@ -68,6 +69,7 @@ public class RCFileMergeMapper extends M
public RCFileMergeMapper() {
}
+ @Override
public void configure(JobConf job) {
jc = job;
hasDynamicPartitions = HiveConf.getBoolVar(job,
@@ -75,7 +77,9 @@ public class RCFileMergeMapper extends M
String specPath = RCFileBlockMergeOutputFormat.getMergeOutputPath(job)
.toString();
- updatePaths(Utilities.toTempPath(specPath));
+ Path tmpPath = Utilities.toTempPath(specPath);
+ Path taskTmpPath = Utilities.toTaskTempPath(specPath);
+ updatePaths(tmpPath, taskTmpPath);
try {
fs = (new Path(specPath)).getFileSystem(job);
autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs,
@@ -86,11 +90,12 @@ public class RCFileMergeMapper extends M
}
}
- private void updatePaths(Path tmpPath) {
+ private void updatePaths(Path tmpPath, Path taskTmpPath) {
String taskId = Utilities.getTaskId(jc);
this.tmpPath = tmpPath;
+ this.taskTmpPath = taskTmpPath;
finalPath = new Path(tmpPath, taskId);
- outPath = new Path(tmpPath, Utilities.toTempPath(taskId));
+ outPath = new Path(taskTmpPath, Utilities.toTempPath(taskId));
}
@Override
@@ -163,9 +168,10 @@ public class RCFileMergeMapper extends M
*
* @param inputPath
* @throws HiveException
+ * @throws IOException
*/
private void fixTmpPath(Path inputPath)
- throws HiveException {
+ throws HiveException, IOException {
dpPath = inputPath;
Path newPath = new Path(".");
int inputDepth = inputPath.depth();
@@ -177,9 +183,16 @@ public class RCFileMergeMapper extends M
inputDepth--;
inputPath = inputPath.getParent();
}
- updatePaths(new Path(tmpPath, newPath));
+
+ Path newTmpPath = new Path(tmpPath, newPath);
+ Path newTaskTmpPath = new Path(taskTmpPath, newPath);
+ if (!fs.exists(newTmpPath)) {
+ fs.mkdirs(newTmpPath);
+ }
+ updatePaths(newTmpPath, newTaskTmpPath);
}
+ @Override
public void close() throws IOException {
// close writer
if (outWriter == null) {