You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2010/07/14 06:24:43 UTC
svn commit: r963943 - in /hadoop/hive/trunk: CHANGES.txt
ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
Author: nzhang
Date: Wed Jul 14 04:24:42 2010
New Revision: 963943
URL: http://svn.apache.org/viewvc?rev=963943&view=rev
Log:
HIVE-1462. Report progress in FileSinkOperator for multiple directory case (Siying Dong via Ning Zhang)
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=963943&r1=963942&r2=963943&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Jul 14 04:24:42 2010
@@ -27,6 +27,10 @@ Trunk - Unreleased
HIVE-1447. Speed up reflection method calls (Zheng via He Yongqiang)
+ HIVE-1462. Report progress in FileSinkOperator works in multiple directory
+ case
+ (Siying Dong via Ning Zhang)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=963943&r1=963942&r2=963943&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Jul 14 04:24:42 2010
@@ -49,10 +49,10 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -100,7 +100,6 @@ public class FileSinkOperator extends Te
Path[] outPaths;
Path[] finalPaths;
RecordWriter[] outWriters;
- int timeOut; // JT timeout in msec.
public FSPaths() {
}
@@ -110,10 +109,6 @@ public class FileSinkOperator extends Te
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
outWriters = new RecordWriter[numFiles];
- // Timeout is chosen to make sure that even if one iteration takes more than
- // half of the script.timeout but less than script.timeout, we will still
- // be able to report progress.
- timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000)/2;
}
/**
@@ -164,31 +159,12 @@ public class FileSinkOperator extends Te
return outWriters;
}
- /**
- * Report status to JT so that JT won't kill this task if closing takes too long
- * due to too many files to close and the NN is overloaded.
- * @param lastUpdateTime the time (msec) that progress update happened.
- * @return true if a new progress update is reported, false otherwise.
- */
- private boolean updateProgress(long lastUpdateTime) {
- if (reporter != null &&
- (System.currentTimeMillis() - lastUpdateTime) > timeOut) {
- reporter.progress();
- return true;
- } else {
- return false;
- }
- }
-
public void closeWriters(boolean abort) throws HiveException {
- long lastProgressReport = System.currentTimeMillis();
for (int idx = 0; idx < outWriters.length; idx++) {
if (outWriters[idx] != null) {
try {
outWriters[idx].close(abort);
- if (updateProgress(lastProgressReport)) {
- lastProgressReport = System.currentTimeMillis();
- }
+ updateProgress();
} catch (IOException e) {
throw new HiveException(e);
}
@@ -197,16 +173,13 @@ public class FileSinkOperator extends Te
}
private void commit(FileSystem fs) throws HiveException {
- long lastProgressReport = System.currentTimeMillis();
for (int idx = 0; idx < outPaths.length; ++idx) {
try {
if (!fs.rename(outPaths[idx], finalPaths[idx])) {
throw new HiveException("Unable to rename output to: "
+ finalPaths[idx]);
}
- if (updateProgress(lastProgressReport)) {
- lastProgressReport = System.currentTimeMillis();
- }
+ updateProgress();
} catch (IOException e) {
throw new HiveException(e + "Unable to rename output to: "
+ finalPaths[idx]);
@@ -215,7 +188,6 @@ public class FileSinkOperator extends Te
}
public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
- long lastProgressReport = System.currentTimeMillis();
for (int idx = 0; idx < outWriters.length; idx++) {
if (outWriters[idx] != null) {
try {
@@ -223,9 +195,7 @@ public class FileSinkOperator extends Te
if (delete) {
fs.delete(outPaths[idx], true);
}
- if (updateProgress(lastProgressReport)) {
- lastProgressReport = System.currentTimeMillis();
- }
+ updateProgress();
} catch (IOException e) {
throw new HiveException(e);
}
@@ -260,6 +230,8 @@ public class FileSinkOperator extends Te
private transient FSPaths fsp;
private transient boolean bDynParts;
private transient SubStructObjectInspector subSetOI;
+ private transient int timeOut; // JT timeout in msec.
+ private transient long lastProgressReport = System.currentTimeMillis();
/**
* TableIdEnum.
@@ -314,6 +286,11 @@ public class FileSinkOperator extends Te
serializer.initialize(null, conf.getTableInfo().getProperties());
outputClass = serializer.getSerializedClass();
+ // Timeout is chosen to make sure that even if one iteration takes more than
+ // half of the script.timeout but less than script.timeout, we will still
+ // be able to report progress.
+ timeOut = hconf.getInt("mapred.healthChecker.script.timeout", 600000)/2;
+
if (hconf instanceof JobConf) {
jc = (JobConf) hconf;
} else {
@@ -504,6 +481,23 @@ public class FileSinkOperator extends Te
filesCreated = true;
}
+ /**
+ * Report status to JT so that JT won't kill this task if closing takes too long
+ * due to too many files to close and the NN is overloaded.
+ * @param lastUpdateTime the time (msec) that progress update happened.
+ * @return true if a new progress update is reported, false otherwise.
+ */
+ private boolean updateProgress() {
+ if (reporter != null &&
+ (System.currentTimeMillis() - lastProgressReport) > timeOut) {
+ reporter.progress();
+ lastProgressReport = System.currentTimeMillis();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
Writable recordValue;
@Override
@@ -632,6 +626,7 @@ public class FileSinkOperator extends Te
createBucketFiles(fsp);
}
+ lastProgressReport = System.currentTimeMillis();
if (!abort) {
for (FSPaths fsp: valToPaths.values()) {
fsp.closeWriters(abort);