You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/01/10 21:29:33 UTC
hive git commit: HIVE-18274 : add AM level metrics for WM (Sergey
Shelukhin, reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master ad0519008 -> cfd2b149a
HIVE-18274 : add AM level metrics for WM (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cfd2b149
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cfd2b149
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cfd2b149
Branch: refs/heads/master
Commit: cfd2b149a82d3e3dfbd1f0608ace514195ec5cf2
Parents: ad05190
Author: sergey <se...@apache.org>
Authored: Wed Jan 10 12:17:50 2018 -0800
Committer: sergey <se...@apache.org>
Committed: Wed Jan 10 13:29:26 2018 -0800
----------------------------------------------------------------------
.../tezplugins/LlapTaskSchedulerService.java | 100 ++++++++++++++-----
.../metrics/LlapTaskSchedulerMetrics.java | 75 +++++++++++++-
2 files changed, 149 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/cfd2b149/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index c16a0a6..4a4915b 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -508,6 +508,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
+ "; the delta to adjust by is " + delta);
if (delta == 0) return;
totalGuaranteed = newTotalGuaranteed;
+ if (metrics != null) {
+ metrics.setWmTotalGuaranteed(totalGuaranteed);
+ }
if (delta > 0) {
if (unusedGuaranteed == 0) {
// There may be speculative tasks waiting.
@@ -517,12 +520,18 @@ public class LlapTaskSchedulerService extends TaskScheduler {
WM_LOG.info("Distributed " + totalUpdated);
}
int result = (unusedGuaranteed += delta);
+ if (metrics != null) {
+ metrics.setWmUnusedGuaranteed(result);
+ }
WM_LOG.info("Setting unused to " + result + " based on remaining delta " + delta);
} else {
delta = -delta;
if (delta <= unusedGuaranteed) {
// Somebody took away our unwanted ducks.
int result = (unusedGuaranteed -= delta);
+ if (metrics != null) {
+ metrics.setWmUnusedGuaranteed(result);
+ }
WM_LOG.info("Setting unused to " + result + " based on full delta " + delta);
return;
} else {
@@ -530,6 +539,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
unusedGuaranteed = 0;
toUpdate = new ArrayList<>();
int totalUpdated = revokeGuaranteed(delta, null, toUpdate);
+ if (metrics != null) {
+ metrics.setWmUnusedGuaranteed(0);
+ }
WM_LOG.info("Setting unused to 0; revoked " + totalUpdated + " / " + delta);
// We must be able to take away the requisite number; if we can't, where'd the ducks go?
if (delta != totalUpdated) {
@@ -553,8 +565,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
synchronized (ti) {
assert ti.isPendingUpdate;
if (ti.lastSetGuaranteed != null && ti.lastSetGuaranteed == ti.isGuaranteed) {
- ti.isPendingUpdate = false;
- ti.requestedValue = null;
+ ti.requestedValue = ti.isGuaranteed;
+ setUpdateDoneUnderTiLock(ti);
WM_LOG.info("Not sending update to " + ti.attemptId);
return; // Nothing to do - e.g. two messages have canceled each other before we could react.
}
@@ -565,17 +577,35 @@ public class LlapTaskSchedulerService extends TaskScheduler {
sendUpdateMessageAsync(ti, newState);
}
+ private void setUpdateStartedUnderTiLock(TaskInfo ti) {
+ ti.isPendingUpdate = true;
+ ti.requestedValue = ti.isGuaranteed;
+ // It's ok to update metrics for two tasks in parallel, but not for the same one.
+ if (metrics != null) {
+ metrics.setWmPendingStarted(ti.requestedValue);
+ }
+ }
+
+ private void setUpdateDoneUnderTiLock(TaskInfo ti) {
+ ti.isPendingUpdate = false;
+ // It's ok to update metrics for two tasks in parallel, but not for the same one.
+ if (metrics != null) {
+ metrics.setWmPendingDone(ti.requestedValue);
+ }
+ ti.lastSetGuaranteed = ti.requestedValue;
+ ti.requestedValue = null;
+ }
+
@VisibleForTesting
protected void handleUpdateResult(TaskInfo ti, boolean isOk) {
// The update options for outside the lock - see below the synchronized block.
Boolean newStateSameTask = null, newStateAnyTask = null;
WM_LOG.info("Received response for " + ti.attemptId + ", " + isOk);
-
synchronized (ti) {
assert ti.isPendingUpdate;
if (ti.isGuaranteed == null) {
// The task has been terminated and the duck accounted for based on local state.
- // Whatever we were doing is irrelevant.
+ // Whatever we were doing is irrelevant. The metrics have also been updated.
ti.isPendingUpdate = false;
ti.requestedValue = null;
return;
@@ -583,16 +613,15 @@ public class LlapTaskSchedulerService extends TaskScheduler {
boolean requestedValue = ti.requestedValue;
if (isOk) {
// We have propagated the value to the task.
- ti.lastSetGuaranteed = requestedValue;
- if (requestedValue == ti.isGuaranteed) {
- // Looks like we've succeeded at bringing the task state up to date with the local state.
- ti.isPendingUpdate = false;
- ti.requestedValue = null;
- return;
- }
+ setUpdateDoneUnderTiLock(ti);
+ if (requestedValue == ti.isGuaranteed) return;
// The state has changed during the update. Let's undo what we just did.
- newStateSameTask = ti.requestedValue = ti.isGuaranteed;
+ newStateSameTask = ti.isGuaranteed;
+ setUpdateStartedUnderTiLock(ti);
} else {
+ if (metrics != null) {
+ metrics.setWmPendingFailed(requestedValue);
+ }
// An error, or couldn't find the task - lastSetGuaranteed does not change. The logic here
// does not account for one special case - we have updated the task, but the response was
// lost and we have received a network error. The state could be inconsistent, making
@@ -604,11 +633,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return;
}
// We failed to update this task. Instead of retrying for this task, find another.
+ // To change isGuaranteed and modify maps, we'd need the epic lock. So, we will not
+ // update the pending state for now as we release this lock to take both.
newStateAnyTask = requestedValue;
}
- }
+ } // End of synchronized (ti)
if (newStateSameTask != null) {
- WM_LOG.info("Sending update to the same task in response handling " + ti.attemptId + ", " + newStateSameTask);
+ WM_LOG.info("Sending update to the same task in response handling "
+ + ti.attemptId + ", " + newStateSameTask);
// We need to send the state update again (the state has changed since the last one).
sendUpdateMessageAsync(ti, newStateSameTask);
@@ -621,13 +653,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
writeLock.lock();
try {
synchronized (ti) {
+ // We have already updated the metrics for the failure; change the state.
ti.isPendingUpdate = false;
ti.requestedValue = null;
if (newStateAnyTask != ti.isGuaranteed) {
- // The state changed between this and previous check within this method.
+ // The state has changed between this and previous check within this method.
+ // The failed update was rendered irrelevant, so we just exit.
return;
}
- WM_LOG.info("Sending update to a different task in response handling " + ti.attemptId + ", " + newStateAnyTask);
+ WM_LOG.info("Sending update to a different task in response handling "
+ + ti.attemptId + ", " + newStateAnyTask);
// First, "give up" on this task and put it back in the original list.
boolean isRemoved = removeFromRunningTaskMap(
newStateAnyTask ? guaranteedTasks : speculativeTasks, ti.task, ti);
@@ -1045,6 +1080,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
WM_LOG.error("Task appears to have been deallocated twice: " + task
+ " There may be inconsistencies in guaranteed task counts.");
} else {
+ if (metrics != null) {
+ metrics.setWmTaskFinished(taskInfo.isGuaranteed, taskInfo.isPendingUpdate);
+ }
isGuaranteedFreed = taskInfo.isGuaranteed;
// This tells the pending update (if any) that whatever it is doing is irrelevant,
// and also makes sure we don't take the duck back twice if this is called twice.
@@ -1196,6 +1234,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
assert updatedCount <= 1;
if (updatedCount == 0) {
int result = ++unusedGuaranteed;
+ if (metrics != null) {
+ metrics.setWmUnusedGuaranteed(result);
+ }
WM_LOG.info("Returning the unused duck; unused is now " + result);
}
if (toUpdate.isEmpty()) return null;
@@ -1542,8 +1583,14 @@ public class LlapTaskSchedulerService extends TaskScheduler {
boolean isGuaranteed = false;
synchronized (taskInfo) {
assert !taskInfo.isPendingUpdate;
- taskInfo.isPendingUpdate = true; // Update w/the request.
- taskInfo.requestedValue = isGuaranteed = taskInfo.isGuaranteed;
+ // Update is included with the submit request; callback is via notifyStarted.
+ isGuaranteed = taskInfo.isGuaranteed;
+ taskInfo.isPendingUpdate = true;
+ taskInfo.requestedValue = taskInfo.isGuaranteed;
+ if (metrics != null) {
+ metrics.setWmTaskStarted(taskInfo.requestedValue);
+ }
+ setUpdateStartedUnderTiLock(taskInfo);
}
TreeMap<Integer, TreeSet<TaskInfo>> runningTasks =
isGuaranteed ? guaranteedTasks : speculativeTasks;
@@ -1816,6 +1863,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
WM_LOG.error("The task had guaranteed flag set before scheduling: " + taskInfo);
} else {
int result = --unusedGuaranteed;
+ if (metrics != null) {
+ metrics.setWmUnusedGuaranteed(result);
+ }
WM_LOG.info("Using an unused duck for " + taskInfo.attemptId
+ "; unused is now " + result);
}
@@ -1985,6 +2035,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return count - remainingCount;
}
+ // Must be called under the epic lock.
private boolean findGuaranteedToReallocate(TaskInfo candidate, Ref<TaskInfo> toUpdate) {
Iterator<Entry<Integer, TreeSet<TaskInfo>>> iterator =
guaranteedTasks.descendingMap().entrySet().iterator();
@@ -2002,8 +2053,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
taskInfo.isGuaranteed = false;
// See the comment in handleUpdateForSinglePriorityLevel.
if (!taskInfo.isPendingUpdate) {
- taskInfo.isPendingUpdate = true;
- taskInfo.requestedValue = taskInfo.isGuaranteed;
+ setUpdateStartedUnderTiLock(taskInfo);
toUpdate.value = taskInfo;
}
}
@@ -2042,8 +2092,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// changed state when it's done with whatever it's doing. The updater is not going to
// give up until the discrepancies are eliminated.
if (!taskInfo.isPendingUpdate) {
- taskInfo.isPendingUpdate = true;
- taskInfo.requestedValue = taskInfo.isGuaranteed;
+ setUpdateStartedUnderTiLock(taskInfo);
WM_LOG.info("Adding " + taskInfo.attemptId + " to update");
toUpdate.add(taskInfo);
} else {
@@ -2067,8 +2116,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
synchronized (failedUpdate) {
assert failedUpdate.isGuaranteed != newValue;
failedUpdate.isGuaranteed = newValue;
- failedUpdate.isPendingUpdate = true;
- failedUpdate.requestedValue = failedUpdate.isGuaranteed;
+ setUpdateStartedUnderTiLock(failedUpdate);
}
WM_LOG.info("Adding failed " + failedUpdate.attemptId + " to update");
// Do not check the state - this is coming from the updater under epic lock.
@@ -2678,7 +2726,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
boolean inDelayedQueue = false;
private final TezTaskAttemptID attemptId;
- // The state for guaranteed task tracking. Synchronized on 'this'
+ // The state for guaranteed task tracking. Synchronized on 'this'.
+ // In addition, "isGuaranteed" is only modified under the epic lock (because it involves
+ // modifying the corresponding structures that contain the task objects, at the same time).
/** Local state in the AM; true/false are what they say, null means terminated and irrelevant. */
private Boolean isGuaranteed = false;
/** The last state positively propagated to the task. Set by the updater. */
http://git-wip-us.apache.org/repos/asf/hive/blob/cfd2b149/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
index 478f949..2660218 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/metrics/LlapTaskSchedulerMetrics.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
*/
@Metrics(about = "Llap Task Scheduler Metrics", context = "scheduler")
public class LlapTaskSchedulerMetrics implements MetricsSource {
-
private final String name;
private final JvmMetrics jvmMetrics;
private final String sessionId;
@@ -79,6 +78,18 @@ public class LlapTaskSchedulerMetrics implements MetricsSource {
MutableCounterInt completedDagcount;
@Metric
MutableCounterInt pendingPreemptionTasksCount;
+ @Metric
+ MutableGaugeInt wmUnusedGuaranteedCount;
+ @Metric
+ MutableGaugeInt wmTotalGuaranteedCount;
+ @Metric
+ MutableCounterInt wmSpeculativePendingCount;
+ @Metric
+ MutableCounterInt wmGuaranteedPendingCount;
+ @Metric
+ MutableCounterInt wmSpeculativeCount;
+ @Metric
+ MutableCounterInt wmGuaranteedCount;
private LlapTaskSchedulerMetrics(String displayName, JvmMetrics jm, String sessionId) {
this.name = displayName;
@@ -172,6 +183,68 @@ public class LlapTaskSchedulerMetrics implements MetricsSource {
pendingPreemptionTasksCount.incr(-1);
}
+ public void setWmPendingStarted(boolean isGuaranteed) {
+ if (isGuaranteed) {
+ wmSpeculativeCount.incr(-1);
+ wmGuaranteedPendingCount.incr();
+ } else {
+ wmGuaranteedCount.incr(-1);
+ wmSpeculativePendingCount.incr();
+ }
+ }
+
+ public void setWmPendingDone(boolean isGuaranteed) {
+ if (isGuaranteed) {
+ wmGuaranteedPendingCount.incr(-1);
+ wmGuaranteedCount.incr();
+ } else {
+ wmSpeculativePendingCount.incr(-1);
+ wmSpeculativeCount.incr();
+ }
+ }
+
+ public void setWmPendingFailed(boolean requestedGuaranteed) {
+ if (requestedGuaranteed) {
+ wmGuaranteedPendingCount.incr(-1);
+ wmSpeculativeCount.incr();
+ } else {
+ wmSpeculativePendingCount.incr(-1);
+ wmGuaranteedCount.incr();
+ }
+ }
+
+ public void setWmTaskStarted(boolean isGuaranteed) {
+ if (isGuaranteed) {
+ wmGuaranteedPendingCount.incr();
+ } else {
+ wmSpeculativePendingCount.incr();
+ }
+ }
+
+ public void setWmTaskFinished(boolean isGuaranteed, boolean isPendingUpdate) {
+ if (isPendingUpdate) {
+ if (isGuaranteed) {
+ wmGuaranteedPendingCount.incr(-1);
+ } else {
+ wmSpeculativePendingCount.incr(-1);
+ }
+ } else {
+ if (isGuaranteed) {
+ wmGuaranteedCount.incr(-1);
+ } else {
+ wmSpeculativeCount.incr(-1);
+ }
+ }
+ }
+
+ public void setWmTotalGuaranteed(int totalGuaranteed) {
+ wmTotalGuaranteedCount.set(totalGuaranteed);
+ }
+
+ public void setWmUnusedGuaranteed(int unusedGuaranteed) {
+ wmUnusedGuaranteedCount.set(unusedGuaranteed);
+ }
+
private void getTaskSchedulerStats(MetricsRecordBuilder rb) {
rb.addGauge(SchedulerClusterNodeCount, clusterNodeCount.value())
.addGauge(SchedulerExecutorsPerInstance, numExecutors.value())