You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/11/25 21:29:09 UTC

svn commit: r1641686 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java

Author: brock
Date: Tue Nov 25 20:29:09 2014
New Revision: 1641686

URL: http://svn.apache.org/r1641686
Log:
HIVE-4009 - CLI Tests fail randomly due to MapReduce LocalJobRunner race condition (Brock reviewed by Szehon)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1641686&r1=1641685&r2=1641686&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Nov 25 20:29:09 2014
@@ -231,7 +231,7 @@ public class HadoopJobExecHelper {
     int numMap = -1;
     int numReduce = -1;
     List<ClientStatsPublisher> clientStatPublishers = getClientStatPublishers();
-
+    final boolean localMode = ShimLoader.getHadoopShims().isLocalMode(job);
     Heartbeater heartbeater = new Heartbeater(th.getTxnManager(), job);
 
     while (!rj.isComplete()) {
@@ -250,51 +250,53 @@ public class HadoopJobExecHelper {
         initializing = false;
       }
 
-      if (!initOutputPrinted) {
-        SessionState ss = SessionState.get();
-
-        String logMapper;
-        String logReducer;
+      if (!localMode) {
+        if (!initOutputPrinted) {
+          SessionState ss = SessionState.get();
+
+          String logMapper;
+          String logReducer;
+
+          TaskReport[] mappers = jc.getMapTaskReports(rj.getID());
+          if (mappers == null) {
+            logMapper = "no information for number of mappers; ";
+          } else {
+            numMap = mappers.length;
+            if (ss != null) {
+              ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
+                Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
+            }
+            logMapper = "number of mappers: " + numMap + "; ";
+          }
 
-        TaskReport[] mappers = jc.getMapTaskReports(rj.getID());
-        if (mappers == null) {
-          logMapper = "no information for number of mappers; ";
-        } else {
-          numMap = mappers.length;
-          if (ss != null) {
-            ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
-              Keys.TASK_NUM_MAPPERS, Integer.toString(numMap));
+          TaskReport[] reducers = jc.getReduceTaskReports(rj.getID());
+          if (reducers == null) {
+            logReducer = "no information for number of reducers. ";
+          } else {
+            numReduce = reducers.length;
+            if (ss != null) {
+              ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
+                Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
+            }
+            logReducer = "number of reducers: " + numReduce;
           }
-          logMapper = "number of mappers: " + numMap + "; ";
+
+          console
+              .printInfo("Hadoop job information for " + getId() + ": " + logMapper + logReducer);
+          initOutputPrinted = true;
         }
 
-        TaskReport[] reducers = jc.getReduceTaskReports(rj.getID());
-        if (reducers == null) {
-          logReducer = "no information for number of reducers. ";
+        RunningJob newRj = jc.getJob(rj.getID());
+        if (newRj == null) {
+          // under exceptional load, hadoop may not be able to look up status
+          // of finished jobs (because it has purged them from memory). From
+          // hive's perspective - it's equivalent to the job having failed.
+          // So raise a meaningful exception
+          throw new IOException("Could not find status of job:" + rj.getID());
         } else {
-          numReduce = reducers.length;
-          if (ss != null) {
-            ss.getHiveHistory().setTaskProperty(SessionState.get().getQueryId(), getId(),
-              Keys.TASK_NUM_REDUCERS, Integer.toString(numReduce));
-          }
-          logReducer = "number of reducers: " + numReduce;
+          th.setRunningJob(newRj);
+          rj = newRj;
         }
-
-        console
-            .printInfo("Hadoop job information for " + getId() + ": " + logMapper + logReducer);
-        initOutputPrinted = true;
-      }
-
-      RunningJob newRj = jc.getJob(rj.getID());
-      if (newRj == null) {
-        // under exceptional load, hadoop may not be able to look up status
-        // of finished jobs (because it has purged them from memory). From
-        // hive's perspective - it's equivalent to the job having failed.
-        // So raise a meaningful exception
-        throw new IOException("Could not find status of job:" + rj.getID());
-      } else {
-        th.setRunningJob(newRj);
-        rj = newRj;
       }
 
       // If fatal errors happen we should kill the job immediately rather than