You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/05/01 03:31:04 UTC

[10/11] hive git commit: HIVE-13421 : Propagate job progress in operation status

HIVE-13421 : Propagate job progress in operation status


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e1fa0ce
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e1fa0ce
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e1fa0ce

Branch: refs/heads/llap
Commit: 9e1fa0ce6f2003300640f0bee9b267e33d714084
Parents: 9179178
Author: Rajat Khandelwal <pr...@apache.org>
Authored: Sat Apr 30 08:16:51 2016 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Sat Apr 30 08:16:51 2016 +0530

----------------------------------------------------------------------
 .../service/cli/session/TestQueryDisplay.java    |  4 ++--
 .../java/org/apache/hadoop/hive/ql/Driver.java   | 13 +++++++++++--
 .../org/apache/hadoop/hive/ql/QueryDisplay.java  | 19 +++++++++++++++++--
 .../org/apache/hadoop/hive/ql/QueryPlan.java     | 16 +---------------
 .../hadoop/hive/ql/exec/ConditionalTask.java     |  1 +
 .../org/apache/hadoop/hive/ql/exec/Task.java     | 14 ++++++++++++--
 .../hadoop/hive/ql/exec/mr/ExecDriver.java       |  2 +-
 .../hive/ql/exec/mr/HadoopJobExecHelper.java     |  6 ++++--
 .../apache/hive/service/cli/CLIServiceTest.java  |  4 +++-
 9 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
index 98581e0..cc18ce7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java
@@ -154,8 +154,8 @@ public class TestQueryDisplay {
     Assert.assertTrue(qDisplay1.getPerfLogStarts(QueryDisplay.Phase.COMPILATION).size() > 0);
     Assert.assertTrue(qDisplay1.getPerfLogEnds(QueryDisplay.Phase.COMPILATION).size() > 0);
 
-    Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 2);
-    QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(1);
+    Assert.assertEquals(qDisplay1.getTaskDisplays().size(), 1);
+    QueryDisplay.TaskDisplay tInfo1 = qDisplay1.getTaskDisplays().get(0);
     Assert.assertEquals(tInfo1.getTaskId(), "Stage-0");
     Assert.assertEquals(tInfo1.getTaskType(), StageType.DDL);
     Assert.assertTrue(tInfo1.getBeginTime() > 0 && tInfo1.getBeginTime() <= System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index dad43fb..32d2cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -450,7 +450,7 @@ public class Driver implements CommandProcessor {
       schema = getSchema(sem, conf);
 
       plan = new QueryPlan(queryStr, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN), queryId,
-        queryState.getHiveOperation(), schema, queryDisplay);
+        queryState.getHiveOperation(), schema);
 
       conf.setQueryString(queryStr);
 
@@ -1507,7 +1507,7 @@ public class Driver implements CommandProcessor {
           perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.PRE_HOOK + peh.getClass().getName());
         }
       }
-
+      setQueryDisplays(plan.getRootTasks());
       int mrJobs = Utilities.getMRTasks(plan.getRootTasks()).size();
       int jobs = mrJobs
         + Utilities.getTezTasks(plan.getRootTasks()).size()
@@ -1739,6 +1739,15 @@ public class Driver implements CommandProcessor {
     return (0);
   }
 
+  private void setQueryDisplays(List<Task<? extends Serializable>> tasks) {
+    if (tasks != null) {
+      for (Task<? extends Serializable> task : tasks) {
+        task.setQueryDisplay(queryDisplay);
+        setQueryDisplays(task.getDependentTasks());
+      }
+    }
+  }
+
   private void logMrWarning(int mrJobs) {
     if (mrJobs <= 0 || !("mr".equals(HiveConf.getVar(conf, ConfVars.HIVE_EXECUTION_ENGINE)))) {
       return;

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
index d582bc0..703e997 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryDisplay.java
@@ -79,6 +79,8 @@ public class QueryDisplay {
     private String name;
     private boolean requireLock;
     private boolean retryIfFail;
+    private String statusMessage;
+
     // required for jackson
     public TaskDisplay() {
 
@@ -158,15 +160,28 @@ public class QueryDisplay {
       if (externalHandle == null && tTask.getExternalHandle() != null) {
         this.externalHandle = tTask.getExternalHandle();
       }
+      setStatusMessage(tTask.getStatusMessage());
       switch (taskState) {
         case RUNNING:
-          beginTime = System.currentTimeMillis();
+          if (beginTime == null) {
+            beginTime = System.currentTimeMillis();
+          }
           break;
         case FINISHED:
-          endTime = System.currentTimeMillis();
+          if (endTime == null) {
+            endTime = System.currentTimeMillis();
+          }
           break;
       }
     }
+
+    public synchronized String getStatusMessage() {
+      return statusMessage;
+    }
+
+    public synchronized void setStatusMessage(String statusMessage) {
+      this.statusMessage = statusMessage;
+    }
   }
   public synchronized void setTaskResult(String taskId, TaskResult result) {
     TaskDisplay taskDisplay = tasks.get(taskId);

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
index ef0923d..e8c8ae6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryPlan.java
@@ -113,26 +113,12 @@ public class QueryPlan implements Serializable {
   }
 
   public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
-                   HiveOperation operation, Schema resultSchema) {
-    this(queryString, sem, startTime, queryId, operation, resultSchema, null);
-  }
-  public QueryPlan(String queryString, BaseSemanticAnalyzer sem, Long startTime, String queryId,
-                  HiveOperation operation, Schema resultSchema, QueryDisplay queryDisplay) {
+                  HiveOperation operation, Schema resultSchema) {
     this.queryString = queryString;
 
     rootTasks = new ArrayList<Task<? extends Serializable>>(sem.getAllRootTasks());
     reducerTimeStatsPerJobList = new ArrayList<ReducerTimeStatsPerJob>();
     fetchTask = sem.getFetchTask();
-    if (queryDisplay != null) {
-      if (fetchTask != null) {
-        fetchTask.setQueryDisplay(queryDisplay);
-      }
-      if (rootTasks!= null) {
-        for (Task t : rootTasks) {
-          t.setQueryDisplay(queryDisplay);
-        }
-      }
-    }
     // Note that inputs and outputs can be changed when the query gets executed
     inputs = sem.getAllInputs();
     outputs = sem.getAllOutputs();

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
index c96c813..52cb445 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ConditionalTask.java
@@ -200,6 +200,7 @@ public class ConditionalTask extends Task<ConditionalWork> implements Serializab
   public boolean addDependentTask(Task<? extends Serializable> dependent) {
     boolean ret = false;
     if (getListTasks() != null) {
+      ret = true;
       for (Task<? extends Serializable> tsk : getListTasks()) {
         ret = ret & tsk.addDependentTask(dependent);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
index 897af5e..34bdafd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
@@ -83,8 +83,18 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
   protected String id;
   protected T work;
   private TaskState taskState = TaskState.CREATED;
+  private String statusMessage;
   private transient boolean fetchSource;
 
+  public void setStatusMessage(String statusMessage) {
+    this.statusMessage = statusMessage;
+    updateStatusInQueryDisplay();
+  }
+
+  public String getStatusMessage() {
+    return statusMessage;
+  }
+
   public enum FeedType {
     DYNAMIC_PARTITIONS, // list of dynamic partitions
   }
@@ -138,13 +148,13 @@ public abstract class Task<T extends Serializable> implements Serializable, Node
     this.queryDisplay = queryDisplay;
   }
 
-  private void updateStatusInQueryDisplay() {
+  protected void updateStatusInQueryDisplay() {
     if (queryDisplay != null) {
       queryDisplay.updateTaskStatus(this);
     }
   }
 
-  private void setState(TaskState state) {
+  protected void setState(TaskState state) {
     this.taskState = state;
     updateStatusInQueryDisplay();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 639b0da..926f6e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -432,7 +432,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
       this.jobID = rj.getJobID();
-
+      updateStatusInQueryDisplay();
       returnVal = jobExecHelper.progress(rj, jc);
       success = (returnVal == 0);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
index 760ba6c..11f5cfd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
@@ -367,12 +367,14 @@ public class HadoopJobExecHelper {
         }
       }
       console.printInfo(output);
+      task.setStatusMessage(output);
       reportTime = System.currentTimeMillis();
     }
 
     if (cpuMsec > 0) {
-      console.printInfo("MapReduce Total cumulative CPU time: "
-          + Utilities.formatMsecToStr(cpuMsec));
+      String status = "MapReduce Total cumulative CPU time: " + Utilities.formatMsecToStr(cpuMsec);
+      console.printInfo(status);
+      task.setStatusMessage(status);
     }
 
     boolean success;

http://git-wip-us.apache.org/repos/asf/hive/blob/9e1fa0ce/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index ff7e9a4..fb8ee4c 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -641,7 +641,8 @@ public abstract class CLIServiceTest {
     SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions, confOverlay);
     assertNotNull(sessionHandle);
     // nonblocking execute
-    String select = "SELECT ID + ' ' FROM TEST_EXEC_ASYNC";
+    String select = "select a.id, b.id from (SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) a full outer join "
+      + "(SELECT ID + ' ' `ID` FROM TEST_EXEC_ASYNC) b on a.ID=b.ID";
     OperationHandle ophandle =
       client.executeStatementAsync(sessionHandle, select, confOverlay);
 
@@ -697,6 +698,7 @@ public abstract class CLIServiceTest {
         case FINISHED:
           if (taskDisplay.getTaskType() == StageType.MAPRED || taskDisplay.getTaskType() == StageType.MAPREDLOCAL) {
             assertNotNull(taskDisplay.getExternalHandle());
+            assertNotNull(taskDisplay.getStatusMessage());
           }
           assertNotNull(taskDisplay.getBeginTime());
           assertNotNull(taskDisplay.getEndTime());