You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/25 21:29:09 UTC
svn commit: r1641686 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
Author: brock
Date: Tue Nov 25 20:29:09 2014
New Revision: 1641686
URL: http://svn.apache.org/r1641686
Log:
HIVE-4009 - CLI Tests fail randomly due to MapReduce LocalJobRunner race condition (Brock reviewed by Szehon)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1641686&r1=1641685&r2=1641686&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Nov 25 20:29:09 2014
@@ -231,7 +231,7 @@ public class HadoopJobExecHelper {
int numMap = -1;
int numReduce = -1;
List<ClientStatsPublisher> clientStatPublishers = getClientStatPublishers();
-
+ final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job);
Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job);
while (!rj.isComplete()) {
@@ -250,51 +250,53 @@ public class HadoopJobExecHelper {
initializing = false;
}
- if (!initOutputPrinted) {
- SessionState ss = SessionState.get();
-
- String logMapper;
- String logReducer;
+ if (!localMode) {
+ if (!initOutputPrinted) {
+ SessionState ss = SessionState.get();
+
+ String logMapper;
+ String logReducer;
+
+ TaskReport[] mappers = jc.getMapTaskReports(rj.getID());
+ if (mappers == null) {
+ logMapper = "no information for number of mappers; ";
+ } else {
+ numMap = mappers.length;
+ if (ss != null) {
+ ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
+ Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
+ }
+ logMapper = "number of mappers: " + numMap + "; ";
+ }
- TaskReport[] mappers = jc.getMapTaskReports(rj.getID());
- if (mappers == null) {
- logMapper = "no information for number of mappers; ";
- } else {
- numMap = mappers.length;
- if (ss != null) {
- ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
- Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
+ TaskReport[] reducers = jc.getReduceTaskReports(rj.getID());
+ if (reducers == null) {
+ logReducer = "no information for number of reducers. ";
+ } else {
+ numReduce = reducers.length;
+ if (ss != null) {
+ ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
+ Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
+ }
+ logReducer = "number of reducers: " + numReduce;
}
- logMapper = "number of mappers: " + numMap + "; ";
+
+ console
+ .printInfo("Hadoop job information for " + getId() + ": " + logMapper + logReducer);
+ initOutputPrinted = true;
}
- TaskReport[] reducers = jc.getReduceTaskReports(rj.getID());
- if (reducers == null) {
- logReducer = "no information for number of reducers. ";
+ RunningJob newRj = jc.getJob(rj.getID());
+ if (newRj == null) {
+ // under exceptional load, hadoop may not be able to look up status
+ // of finished jobs (because it has purged them from memory). From
+ // hive's perspective - it's equivalent to the job having failed.
+ // So raise a meaningful exception
+ throw new IOException("Could not find status of job:" + rj.getID());
} else {
- numReduce = reducers.length;
- if (ss != null) {
- ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
- Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
- }
- logReducer = "number of reducers: " + numReduce;
+ th.setRunningJob(newRj);
+ rj = newRj;
}
-
- console
- .printInfo("Hadoop job information for " + getId() + ": " + logMapper + logReducer);
- initOutputPrinted = true;
- }
-
- RunningJob newRj = jc.getJob(rj.getID());
- if (newRj == null) {
- // under exceptional load, hadoop may not be able to look up status
- // of finished jobs (because it has purged them from memory). From
- // hive's perspective - it's equivalent to the job having failed.
- // So raise a meaningful exception
- throw new IOException("Could not find status of job:" + rj.getID());
- } else {
- th.setRunningJob(newRj);
- rj = newRj;
}
// If fatal errors happen we should kill the job immediately rather than