You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/10 21:29:33 UTC

hive git commit: HIVE-18274 : add AM level metrics for WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master ad0519008 -> cfd2b149a


HIVE-18274 : add AM level metrics for WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cfd2b149
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cfd2b149
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cfd2b149

Branch: refs/heads/master
Commit: cfd2b149a82d3e3dfbd1f0608ace514195ec5cf2
Parents: ad05190
Author: sergey <se...@apache.org>
Authored: Wed Jan 10 12:17:50 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Jan 10 13:29:26 2018 -0800

----------------------------------------------------------------------
 .../tezplugins/LlapTaskSchedulerService.java    | 100 ++++++++++++++-----
 .../metrics/LlapTaskSchedulerMetrics.java       |  75 +++++++++++++-
 2 files changed, 149 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cfd2b149/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index c16a0a6..4a4915b 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -508,6 +508,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           + "; the delta to adjust by is " + delta);
       if (delta == 0) return;
       totalGuaranteed = newTotalGuaranteed;
+      if (metrics != null) {
+        metrics.setWmTotalGuaranteed(totalGuaranteed);
+      }
       if (delta > 0) {
         if (unusedGuaranteed == 0) {
           // There may be speculative tasks waiting.
@@ -517,12 +520,18 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           WM_LOG.info("Distributed " + totalUpdated);
         }
         int result = (unusedGuaranteed += delta);
+        if (metrics != null) {
+          metrics.setWmUnusedGuaranteed(result);
+        }
         WM_LOG.info("Setting unused to " + result + " based on remaining delta " + delta);
       } else {
         delta = -delta;
         if (delta <= unusedGuaranteed) {
           // Somebody took away our unwanted ducks.
           int result = (unusedGuaranteed -= delta);
+          if (metrics != null) {
+            metrics.setWmUnusedGuaranteed(result);
+          }
           WM_LOG.info("Setting unused to " + result + " based on full delta " + delta);
           return;
         } else {
@@ -530,6 +539,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           unusedGuaranteed = 0;
           toUpdate = new ArrayList<>();
           int totalUpdated = revokeGuaranteed(delta, null, toUpdate);
+          if (metrics != null) {
+            metrics.setWmUnusedGuaranteed(0);
+          }
           WM_LOG.info("Setting unused to 0; revoked " + totalUpdated + " / " + delta);
           // We must be able to take away the requisite number; if we can't, where'd the ducks go?
           if (delta != totalUpdated) {
@@ -553,8 +565,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     synchronized (ti) {
       assert ti.isPendingUpdate;
       if (ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == ti.isGuaranteed) {
-        ti.isPendingUpdate = false;
-        ti.requestedValue = null;
+        ti.requestedValue = ti.isGuaranteed;
+        setUpdateDoneUnderTiLock(ti);
         WM_LOG.info("Not sending update to " + ti.attemptId);
         return; // Nothing to do - e.g. two messages have canceled each other before we could react.
       }
@@ -565,17 +577,35 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     sendUpdateMessageAsync(ti, newState);
   }
 
+  private void setUpdateStartedUnderTiLock(TaskInfo ti) {
+    ti.isPendingUpdate = true;
+    ti.requestedValue = ti.isGuaranteed;
+    // It's ok to update metrics for two tasks in parallel, but not for the same one.
+    if (metrics != null) {
+      metrics.setWmPendingStarted(ti.requestedValue);
+    }
+  }
+
+  private void setUpdateDoneUnderTiLock(TaskInfo ti) {
+    ti.isPendingUpdate = false;
+    // It's ok to update metrics for two tasks in parallel, but not for the same one.
+    if (metrics != null) {
+      metrics.setWmPendingDone(ti.requestedValue);
+    }
+    ti.lastSetGuaranteed = ti.requestedValue;
+    ti.requestedValue = null;
+  }
+
   @VisibleForTesting
   protected void handleUpdateResult(TaskInfo ti, boolean isOk) {
     // The update options for outside the lock - see below the synchronized block.
     Boolean newStateSameTask = null, newStateAnyTask = null;
     WM_LOG.info("Received response for " + ti.attemptId + ", " + isOk);
-
     synchronized (ti) {
       assert ti.isPendingUpdate;
       if (ti.isGuaranteed == null) {
         // The task has been terminated and the duck accounted for based on local state.
-        // Whatever we were doing is irrelevant.
+        // Whatever we were doing is irrelevant. The metrics have also been updated.
         ti.isPendingUpdate = false;
         ti.requestedValue = null;
         return;
@@ -583,16 +613,15 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       boolean requestedValue = ti.requestedValue;
       if (isOk) {
         // We have propagated the value to the task.
-        ti.lastSetGuaranteed = requestedValue;
-        if (requestedValue == ti.isGuaranteed) {
-          // Looks like we've succeeded at bringing the task state up to date with the local state.
-          ti.isPendingUpdate = false;
-          ti.requestedValue = null;
-          return;
-        }
+        setUpdateDoneUnderTiLock(ti);
+        if (requestedValue == ti.isGuaranteed) return;
         // The state has changed during the update. Let's undo what we just did.
-        newStateSameTask = ti.requestedValue = ti.isGuaranteed;
+        newStateSameTask = ti.isGuaranteed;
+        setUpdateStartedUnderTiLock(ti);
       } else {
+        if (metrics != null) {
+          metrics.setWmPendingFailed(requestedValue);
+        }
         // An error, or couldn't find the task - lastSetGuaranteed does not change. The logic here
         // does not account for one special case - we have updated the task, but the response was
         // lost and we have received a network error. The state could be inconsistent, making
@@ -604,11 +633,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           return;
         }
         // We failed to update this task. Instead of retrying for this task, find another.
+        // To change isGuaranteed and modify maps, we'd need the epic lock. So, we will not
+        // update the pending state for now as we release this lock to take both.
         newStateAnyTask = requestedValue;
       }
-    }
+    } // End of synchronized (ti) 
     if (newStateSameTask != null) {
-      WM_LOG.info("Sending update to the same task in response handling " + ti.attemptId + ", " + newStateSameTask);
+      WM_LOG.info("Sending update to the same task in response handling "
+          + ti.attemptId + ", " + newStateSameTask);
 
       // We need to send the state update again (the state has changed since the last one).
       sendUpdateMessageAsync(ti, newStateSameTask);
@@ -621,13 +653,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     writeLock.lock();
     try {
       synchronized (ti) {
+        // We have already updated the metrics for the failure; change the state.
         ti.isPendingUpdate = false;
         ti.requestedValue = null;
         if (newStateAnyTask != ti.isGuaranteed) {
-          // The state changed between this and previous check within this method.
+          // The state has changed between this and previous check within this method.
+          // The failed update was rendered irrelevant, so we just exit.
           return;
         }
-        WM_LOG.info("Sending update to a different task in response handling " + ti.attemptId + ", " + newStateAnyTask);
+        WM_LOG.info("Sending update to a different task in response handling "
+            + ti.attemptId + ", " + newStateAnyTask);
         // First, "give up" on this task and put it back in the original list.
         boolean isRemoved = removeFromRunningTaskMap(
             newStateAnyTask ? guaranteedTasks : speculativeTasks, ti.task, ti);
@@ -1045,6 +1080,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           WM_LOG.error("Task appears to have been deallocated twice: " + task
               + " There may be inconsistencies in guaranteed task counts.");
         } else {
+          if (metrics != null) {
+            metrics.setWmTaskFinished(taskInfo.isGuaranteed, taskInfo.isPendingUpdate);
+          }
           isGuaranteedFreed = taskInfo.isGuaranteed;
           // This tells the pending update (if any) that whatever it is doing is irrelevant,
           // and also makes sure we don't take the duck back twice if this is called twice.
@@ -1196,6 +1234,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     assert updatedCount <= 1;
     if (updatedCount == 0) {
       int result = ++unusedGuaranteed;
+      if (metrics != null) {
+        metrics.setWmUnusedGuaranteed(result);
+      }
       WM_LOG.info("Returning the unused duck; unused is now " + result);
     }
     if (toUpdate.isEmpty()) return null;
@@ -1542,8 +1583,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     boolean isGuaranteed = false;
     synchronized (taskInfo) {
       assert !taskInfo.isPendingUpdate;
-      taskInfo.isPendingUpdate = true; // Update w/the request.
-      taskInfo.requestedValue = isGuaranteed = taskInfo.isGuaranteed;
+      // Update is included with the submit request; callback is via notifyStarted.
+      isGuaranteed = taskInfo.isGuaranteed;
+      taskInfo.isPendingUpdate = true;
+      taskInfo.requestedValue = taskInfo.isGuaranteed;
+      if (metrics != null) {
+        metrics.setWmTaskStarted(taskInfo.requestedValue);
+      }
+      setUpdateStartedUnderTiLock(taskInfo);
     }
     TreeMap<Integer, TreeSet<TaskInfo>> runningTasks =
         isGuaranteed ? guaranteedTasks : speculativeTasks;
@@ -1816,6 +1863,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         WM_LOG.error("The task had guaranteed flag set before scheduling: " + taskInfo);
       } else {
         int result = --unusedGuaranteed;
+        if (metrics != null) {
+          metrics.setWmUnusedGuaranteed(result);
+        }
         WM_LOG.info("Using an unused duck for " + taskInfo.attemptId
             + "; unused is now " + result);
       }
@@ -1985,6 +2035,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     return count - remainingCount;
   }
 
+  // Must be called under the epic lock.
   private boolean findGuaranteedToReallocate(TaskInfo candidate, Ref<TaskInfo> toUpdate) {
     Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator =
         guaranteedTasks.descendingMap().entrySet().iterator();
@@ -2002,8 +2053,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
           taskInfo.isGuaranteed = false;
           // See the comment in handleUpdateForSinglePriorityLevel.
           if (!taskInfo.isPendingUpdate) {
-            taskInfo.isPendingUpdate = true;
-            taskInfo.requestedValue = taskInfo.isGuaranteed;
+            setUpdateStartedUnderTiLock(taskInfo);
             toUpdate.value = taskInfo;
           }
         }
@@ -2042,8 +2092,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
         // changed state when it's done with whatever it's doing. The updater is not going to
         // give up until the discrepancies are eliminated.
         if (!taskInfo.isPendingUpdate) {
-          taskInfo.isPendingUpdate = true;
-          taskInfo.requestedValue = taskInfo.isGuaranteed;
+          setUpdateStartedUnderTiLock(taskInfo);
           WM_LOG.info("Adding " + taskInfo.attemptId + " to update");
           toUpdate.add(taskInfo);
         } else {
@@ -2067,8 +2116,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
       synchronized (failedUpdate) {
         assert failedUpdate.isGuaranteed != newValue;
         failedUpdate.isGuaranteed = newValue;
-        failedUpdate.isPendingUpdate = true;
-        failedUpdate.requestedValue = failedUpdate.isGuaranteed;
+        setUpdateStartedUnderTiLock(failedUpdate);
       }
       WM_LOG.info("Adding failed " + failedUpdate.attemptId + " to update");
       // Do not check the state - this is coming from the updater under epic lock.
@@ -2678,7 +2726,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
     boolean inDelayedQueue = false;
     private final TezTaskAttemptID attemptId;
 
-    // The state for guaranteed task tracking. Synchronized on 'this'
+    // The state for guaranteed task tracking. Synchronized on 'this'.
+    // In addition, "isGuaranteed" is only modified under the epic lock (because it involves
+    // modifying the corresponding structures that contain the task objects, at the same time).
     /** Local state in the AM; true/false are what they say, null means terminated and irrelevant. */
     private Boolean isGuaranteed = false;
     /** The last state positively propagated to the task. Set by the updater. */

http://git-wip-us.apache.org/repos/asf/hive/blob/cfd2b149/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
index 478f949..2660218 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
  */
 @Metrics(about = "Llap Task Scheduler Metrics", context = "scheduler")
 public class LlapTaskSchedulerMetrics implements MetricsSource {
-
   private final String name;
   private final JvmMetrics jvmMetrics;
   private final String sessionId;
@@ -79,6 +78,18 @@ public class LlapTaskSchedulerMetrics implements MetricsSource {
   MutableCounterInt completedDagcount;
   @Metric
   MutableCounterInt pendingPreemptionTasksCount;
+  @Metric
+  MutableGaugeInt wmUnusedGuaranteedCount;
+  @Metric
+  MutableGaugeInt wmTotalGuaranteedCount;
+  @Metric
+  MutableCounterInt wmSpeculativePendingCount;
+  @Metric
+  MutableCounterInt wmGuaranteedPendingCount;
+  @Metric
+  MutableCounterInt wmSpeculativeCount;
+  @Metric
+  MutableCounterInt wmGuaranteedCount;
 
   private LlapTaskSchedulerMetrics(String displayName, JvmMetrics jm, String sessionId) {
     this.name = displayName;
@@ -172,6 +183,68 @@ public class LlapTaskSchedulerMetrics implements MetricsSource {
     pendingPreemptionTasksCount.incr(-1);
   }
 
+  public void setWmPendingStarted(boolean isGuaranteed) {
+    if (isGuaranteed) {
+      wmSpeculativeCount.incr(-1);
+      wmGuaranteedPendingCount.incr();
+    } else {
+      wmGuaranteedCount.incr(-1);
+      wmSpeculativePendingCount.incr();
+    }
+  }
+
+  public void setWmPendingDone(boolean isGuaranteed) {
+    if (isGuaranteed) {
+      wmGuaranteedPendingCount.incr(-1);
+      wmGuaranteedCount.incr();
+    } else {
+      wmSpeculativePendingCount.incr(-1);
+      wmSpeculativeCount.incr();
+    }
+  }
+
+  public void setWmPendingFailed(boolean requestedGuaranteed) {
+    if (requestedGuaranteed) {
+      wmGuaranteedPendingCount.incr(-1);
+      wmSpeculativeCount.incr();
+    } else {
+      wmSpeculativePendingCount.incr(-1);
+      wmGuaranteedCount.incr();
+    }
+  }
+
+  public void setWmTaskStarted(boolean isGuaranteed) {
+    if (isGuaranteed) {
+      wmGuaranteedPendingCount.incr();
+    } else {
+      wmSpeculativePendingCount.incr();
+    }
+  }
+
+  public void setWmTaskFinished(boolean isGuaranteed, boolean isPendingUpdate) {
+    if (isPendingUpdate) {
+      if (isGuaranteed) {
+        wmGuaranteedPendingCount.incr(-1);
+      } else {
+        wmSpeculativePendingCount.incr(-1);
+      }
+    } else {
+      if (isGuaranteed) {
+        wmGuaranteedCount.incr(-1);
+      } else {
+        wmSpeculativeCount.incr(-1);
+      }
+    }
+  }
+
+  public void setWmTotalGuaranteed(int totalGuaranteed) {
+    wmTotalGuaranteedCount.set(totalGuaranteed);
+  }
+
+  public void setWmUnusedGuaranteed(int unusedGuaranteed) {
+    wmUnusedGuaranteedCount.set(unusedGuaranteed);
+  }
+
   private void getTaskSchedulerStats(MetricsRecordBuilder rb) {
     rb.addGauge(SchedulerClusterNodeCount, clusterNodeCount.value())
         .addGauge(SchedulerExecutorsPerInstance, numExecutors.value())