You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/12/02 09:44:09 UTC
hive git commit: HIVE-15242: LLAP: Act on Node update notifications
from registry, fix isAlive checks (Siddharth Seth,
Sergey Shelukhin reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master d53aa79ca -> 4e0754681
HIVE-15242: LLAP: Act on Node update notifications from registry, fix isAlive checks (Siddharth Seth, Sergey Shelukhin reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4e075468
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4e075468
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4e075468
Branch: refs/heads/master
Commit: 4e0754681cbf4d0f72ddc958e1999b1408ff9684
Parents: d53aa79
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Dec 2 01:43:54 2016 -0800
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Dec 2 01:43:54 2016 -0800
----------------------------------------------------------------------
.../hive/llap/registry/ServiceInstance.java | 7 -
.../hive/llap/registry/ServiceInstanceSet.java | 5 +
.../registry/impl/InactiveServiceInstance.java | 5 -
.../registry/impl/LlapFixedRegistryImpl.java | 5 -
.../impl/LlapZookeeperRegistryImpl.java | 22 +-
.../hadoop/hive/llap/LlapBaseInputFormat.java | 4 +-
.../tezplugins/LlapTaskSchedulerService.java | 335 +++++++++----------
.../TestLlapTaskSchedulerService.java | 2 +-
8 files changed, 176 insertions(+), 209 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
index 9004d3c..081995c 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -66,13 +66,6 @@ public interface ServiceInstance {
*/
public int getOutputFormatPort();
- /**
- * Return the last known state (without refreshing)
- *
- * @return
- */
-
- public boolean isAlive();
/**
* Config properties of the Service Instance (llap.daemon.*)
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 1e8c895..0544038 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -16,6 +16,11 @@ package org.apache.hadoop.hive.llap.registry;
import java.util.Collection;
import java.util.Set;
+/**
+ * Note: For most of the implementations, there's no guarantee that the ServiceInstance returned by
+ * one invocation is the same as the instance returned by another invocation. e.g. the ZK registry
+ * returns a new ServiceInstance object each time a getInstance call is made.
+ */
public interface ServiceInstanceSet {
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
index 79b7d51..9f2f3b4 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
@@ -31,11 +31,6 @@ public class InactiveServiceInstance implements ServiceInstance {
}
@Override
- public boolean isAlive() {
- return false;
- }
-
- @Override
public String getHost() {
return null;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index bbfcbf6..10ff82e 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -184,11 +184,6 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
- public boolean isAlive() {
- return true;
- }
-
- @Override
public Map<String, String> getProperties() {
Map<String, String> properties = new HashMap<>(srv);
// no worker identity
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 59f7c9e..525aadb 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -22,12 +22,9 @@ import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -60,7 +57,6 @@ import org.apache.hadoop.hive.llap.registry.ServiceInstance;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
import org.apache.hadoop.hive.llap.registry.ServiceInstanceStateChangeListener;
import org.apache.hadoop.hive.llap.registry.ServiceRegistry;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils.ServiceRecordMarshal;
@@ -404,7 +400,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
private class DynamicServiceInstance implements ServiceInstance {
private final ServiceRecord srv;
- private boolean alive = true;
private final String host;
private final int rpcPort;
private final int mngPort;
@@ -470,17 +465,6 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
}
@Override
- public boolean isAlive() {
- return alive;
- }
-
- public void kill() {
- // May be possible to generate a notification back to the scheduler from here.
- LOG.info("Killing service instance: " + this);
- this.alive = false;
- }
-
- @Override
public Map<String, String> getProperties() {
return srv.attributes();
}
@@ -494,7 +478,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
@Override
public String toString() {
- return "DynamicServiceInstance [alive=" + alive + ", host=" + host + ":" + rpcPort +
+ return "DynamicServiceInstance [id=" + getWorkerIdentity() + ", host=" + host + ":" + rpcPort +
" with resources=" + getResource() + ", shufflePort=" + getShufflePort() +
", servicesAddress=" + getServicesAddress() + ", mgmtPort=" + getManagementPort() + "]";
}
@@ -509,8 +493,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
return outputFormatPort;
}
- // Relying on the identity hashCode and equality, since refreshing instances retains the old copy
- // of an already known instance.
+ // TODO: This needs a hashCode/equality implementation if used as a key in various structures.
+ // A new ServiceInstance is created each time.
}
private class DynamicServiceInstanceSet implements ServiceInstanceSet {
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 288a8eb..15a81da 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -279,9 +279,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
// Get the first live service instance
for (ServiceInstance serviceInstance : serviceInstances) {
- if (serviceInstance.isAlive()) {
- return serviceInstance;
- }
+ return serviceInstance;
}
LOG.info("No live service instances were found");
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index 3f0dde5..158772b 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -18,6 +18,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
@@ -70,6 +71,8 @@ import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Clock;
@@ -82,6 +85,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskScheduler;
@@ -316,8 +320,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
registry.registerStateChangeListener(new NodeStateChangeListener());
activeInstances = registry.getInstances();
for (ServiceInstance inst : activeInstances.getAll()) {
- addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
- metrics));
+ addNode(new NodeInfo(inst, nodeBlacklistConf, clock,
+ numSchedulableTasksPerNode, metrics), inst);
}
} finally {
writeLock.unlock();
@@ -329,22 +333,31 @@ public class LlapTaskSchedulerService extends TaskScheduler {
@Override
public void onCreate(ServiceInstance serviceInstance) {
- addNode(serviceInstance, new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
- numSchedulableTasksPerNode, metrics));
- LOG.info("Added node with identity: {}", serviceInstance.getWorkerIdentity());
+ LOG.info("Added node with identity: {} as a result of registry callback",
+ serviceInstance.getWorkerIdentity());
+ addNode(new NodeInfo(serviceInstance, nodeBlacklistConf, clock,
+ numSchedulableTasksPerNode, metrics), serviceInstance);
}
@Override
public void onUpdate(ServiceInstance serviceInstance) {
- instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
- nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
- LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
+ // TODO In what situations will this be invoked?
+ LOG.warn(
+ "Not expecing Updates from the registry. Received update for instance={}. Ignoring",
+ serviceInstance);
+// Replacing NodeInfo means we end up discarding whatever state was known about that node.
+// instanceToNodeMap.put(serviceInstance.getWorkerIdentity(), new NodeInfo(serviceInstance,
+// nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics));
+//
+//
+// LOG.info("Updated node with identity: {}", serviceInstance.getWorkerIdentity());
}
@Override
public void onRemove(ServiceInstance serviceInstance) {
- // FIXME: disabling this for now
- // instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
+ NodeReport nodeReport = constructNodeReport(serviceInstance, false);
+ getContext().nodesUpdated(Collections.singletonList(nodeReport));
+ instanceToNodeMap.remove(serviceInstance.getWorkerIdentity());
LOG.info("Removed node with identity: {}", serviceInstance.getWorkerIdentity());
if (metrics != null) {
metrics.setClusterNodeCount(activeInstances.size());
@@ -447,12 +460,10 @@ public class LlapTaskSchedulerService extends TaskScheduler {
try {
int numInstancesFound = 0;
for (ServiceInstance inst : activeInstances.getAll()) {
- if (inst.isAlive()) {
- Resource r = inst.getResource();
- memory += r.getMemory();
- vcores += r.getVirtualCores();
- numInstancesFound++;
- }
+ Resource r = inst.getResource();
+ memory += r.getMemory();
+ vcores += r.getVirtualCores();
+ numInstancesFound++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("GetTotalResources: numInstancesFound={}, totalMem={}, totalVcores={}",
@@ -475,19 +486,26 @@ public class LlapTaskSchedulerService extends TaskScheduler {
// need a state store eventually for current state & measure backoffs
int memory = 0;
int vcores = 0;
+
readLock.lock();
try {
- for (Entry<String, NodeInfo> entry : instanceToNodeMap.entrySet()) {
- if (entry.getValue().getServiceInstance().isAlive() && !entry.getValue().isDisabled()) {
- Resource r = entry.getValue().getServiceInstance().getResource();
+ int numInstancesFound = 0;
+ for (ServiceInstance inst : activeInstances.getAll()) {
+ NodeInfo nodeInfo = instanceToNodeMap.get(inst.getWorkerIdentity());
+ if (nodeInfo != null && !nodeInfo.isDisabled()) {
+ Resource r = inst.getResource();
memory += r.getMemory();
vcores += r.getVirtualCores();
+ numInstancesFound++;
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("GetAvailableResources: numInstancesFound={}, totalMem={}, totalVcores={}",
+ numInstancesFound, memory, vcores);
+ }
} finally {
readLock.unlock();
}
-
return Resource.newInstance(memory, vcores);
}
@@ -495,13 +513,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
public int getClusterNodeCount() {
readLock.lock();
try {
- int n = 0;
- for (ServiceInstance inst : activeInstances.getAll()) {
- if (inst.isAlive()) {
- n++;
- }
- }
- return n;
+ return activeInstances.getAll().size();
} finally {
readLock.unlock();
}
@@ -516,17 +528,22 @@ public class LlapTaskSchedulerService extends TaskScheduler {
metrics.incrCompletedDagCount();
}
dagStats = new StatsPerDag();
+ // TODO Cleanup pending tasks etc, so that the next dag is not affected.
}
@Override
public void blacklistNode(NodeId nodeId) {
LOG.info("BlacklistNode not supported");
+ // TODO Disable blacklisting in Tez when using LLAP, until this is properly supported.
+ // Blacklisting can cause containers to move to a terminating state, which can cause attempt to be marked as failed.
+ // This becomes problematic when we set #allowedFailures to 0
// TODO HIVE-13484 What happens when we try scheduling a task on a node that Tez at this point thinks is blacklisted.
}
@Override
public void unblacklistNode(NodeId nodeId) {
LOG.info("unBlacklistNode not supported");
+ // TODO: See comments under blacklistNode.
}
@Override
@@ -598,26 +615,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
return false;
}
- ServiceInstance assignedInstance = taskInfo.assignedInstance;
- assert assignedInstance != null;
-
- NodeInfo nodeInfo = instanceToNodeMap.get(assignedInstance.getWorkerIdentity());
+ NodeInfo nodeInfo = taskInfo.assignedNode;
assert nodeInfo != null;
-
LOG.info("Processing de-allocate request for task={}, state={}, endReason={}", taskInfo.task,
taskInfo.getState(), endReason);
// Re-enable the node if preempted
if (taskInfo.getState() == TaskInfo.State.PREEMPTED) {
LOG.info("Processing deallocateTask for {} which was preempted, EndReason={}", task, endReason);
- unregisterPendingPreemption(taskInfo.assignedInstance.getHost());
+ unregisterPendingPreemption(taskInfo.assignedNode.getHost());
nodeInfo.registerUnsuccessfulTaskEnd(true);
if (nodeInfo.isDisabled()) {
- // Re-enable the node. If a task succeeded, a slot may have become available.
- // Also reset commFailures since a task was able to communicate back and indicate success.
- nodeInfo.enableNode();
- // Re-insert into the queue to force the poll thread to remove the element.
- reinsertNodeInfo(nodeInfo);
+ // Re-enable the node, if a task completed due to preemption. Capacity has become available,
+ // and we may have been able to communicate with the node.
+ queueNodeForReEnablement(nodeInfo);
}
// In case of success, trigger a scheduling run for pending tasks.
trySchedulingPendingTasks();
@@ -631,9 +642,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (nodeInfo.isDisabled()) {
// Re-enable the node. If a task succeeded, a slot may have become available.
// Also reset commFailures since a task was able to communicate back and indicate success.
- nodeInfo.enableNode();
- // Re-insert into the queue to force the poll thread to remove the element.
- reinsertNodeInfo(nodeInfo);
+ queueNodeForReEnablement(nodeInfo);
}
// In case of success, trigger a scheduling run for pending tasks.
trySchedulingPendingTasks();
@@ -644,21 +653,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
.of(TaskAttemptEndReason.EXECUTOR_BUSY, TaskAttemptEndReason.COMMUNICATION_ERROR)
.contains(endReason)) {
if (endReason == TaskAttemptEndReason.COMMUNICATION_ERROR) {
- dagStats.registerCommFailure(taskInfo.assignedInstance.getHost());
+ dagStats.registerCommFailure(taskInfo.assignedNode.getHost());
} else if (endReason == TaskAttemptEndReason.EXECUTOR_BUSY) {
- dagStats.registerTaskRejected(taskInfo.assignedInstance.getHost());
+ dagStats.registerTaskRejected(taskInfo.assignedNode.getHost());
}
}
if (endReason != null && endReason == TaskAttemptEndReason.NODE_FAILED) {
LOG.info(
- "Task {} ended on {} nodeInfo.toString() with a NODE_FAILED message." +
- " An message should come in from the registry to disable this node unless" +
+ "Task {} ended on {} with a NODE_FAILED message." +
+ " A message should come in from the registry to disable this node unless" +
" this was a temporary communication failure",
- task, assignedInstance);
+ task, nodeInfo.toShortString());
}
boolean commFailure =
endReason != null && endReason == TaskAttemptEndReason.COMMUNICATION_ERROR;
- disableInstance(assignedInstance, commFailure);
+ disableNode(nodeInfo, commFailure);
}
}
} finally {
@@ -668,15 +677,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return true;
}
- private void reinsertNodeInfo(final NodeInfo nodeInfo) {
- if ( disabledNodesQueue.remove(nodeInfo)) {
- disabledNodesQueue.add(nodeInfo);
- }
- if (metrics != null) {
- metrics.setDisabledNodeCount(disabledNodesQueue.size());
- }
- }
-
@Override
public Object deallocateContainer(ContainerId containerId) {
LOG.debug("Ignoring deallocateContainer for containerId: " + containerId);
@@ -730,11 +730,11 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (nodeInfo.canAcceptTask()) {
// Successfully scheduled.
LOG.info(
- "Assigning " + nodeToString(inst, nodeInfo) + " when looking for " + host +
+ "Assigning " + nodeInfo.toShortString() + " when looking for " + host +
". local=true" + " FirstRequestedHost=" + (prefHostCount == 0) +
(requestedHosts.length > 1 ? ", #prefLocations=" + requestedHosts.length :
""));
- return new SelectHostResult(inst, nodeInfo);
+ return new SelectHostResult(nodeInfo);
} else {
// The node cannot accept a task at the moment.
if (shouldDelayForLocality) {
@@ -755,8 +755,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
} else {
LOG.warn(
- "Null NodeInfo when attempting to get host with worker identity {}, and host {}",
- inst.getWorkerIdentity(), host);
+ "Null NodeInfo when attempting to get host with worker {}, and host {}",
+ inst, host);
// Leave requestedHostWillBecomeAvailable as is. If some other host is found - delay,
// else ends up allocating to a random host immediately.
}
@@ -798,56 +798,46 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return SELECT_HOST_RESULT_DELAYED_RESOURCES;
}
NodeInfo randomNode = all.get(random.nextInt(all.size()));
- LOG.info("Assigning " + nodeToString(randomNode.getServiceInstance(), randomNode)
+ LOG.info("Assigning " + randomNode.toShortString()
+ " when looking for any host, from #hosts=" + all.size() + ", requestedHosts="
+ ((requestedHosts == null || requestedHosts.length == 0)
? "null" : Arrays.toString(requestedHosts)));
- return new SelectHostResult(randomNode.getServiceInstance(), randomNode);
+ return new SelectHostResult(randomNode);
} finally {
readLock.unlock();
}
}
- private void scanForNodeChanges() {
- /* check again whether nodes are disabled or just missing */
- writeLock.lock();
- try {
- for (ServiceInstance inst : activeInstances.getAll()) {
- if (inst.isAlive() && instanceToNodeMap.containsKey(inst.getWorkerIdentity()) == false) {
- /* that's a good node, not added to the allocations yet */
- LOG.info("Found a new node: " + inst + ".");
- addNode(inst, new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode,
- metrics));
- }
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- private void addNode(ServiceInstance inst, NodeInfo node) {
+ private void addNode(NodeInfo node, ServiceInstance serviceInstance) {
// we have just added a new node. Signal timeout monitor to reset timer
if (activeInstances.size() == 1) {
LOG.info("New node added. Signalling scheduler timeout monitor thread to stop timer.");
stopTimeoutMonitor();
}
- instanceToNodeMap.put(inst.getWorkerIdentity(), node);
+
+ NodeReport nodeReport = constructNodeReport(serviceInstance, true);
+ getContext().nodesUpdated(Collections.singletonList(nodeReport));
+
+ instanceToNodeMap.put(node.getNodeIdentity(), node);
if (metrics != null) {
metrics.setClusterNodeCount(activeInstances.size());
}
// Trigger scheduling since a new node became available.
+ LOG.info("Adding new node: {}", node);
trySchedulingPendingTasks();
}
private void reenableDisabledNode(NodeInfo nodeInfo) {
writeLock.lock();
try {
- LOG.info("Attempting to re-enable node: " + nodeInfo.getServiceInstance());
- if (nodeInfo.getServiceInstance().isAlive()) {
+ LOG.info("Attempting to re-enable node: " + nodeInfo.toShortString());
+ if (activeInstances.getInstance(nodeInfo.getNodeIdentity()) != null) {
nodeInfo.enableNode();
} else {
if (LOG.isInfoEnabled()) {
- LOG.info("Removing dead node " + nodeInfo);
+ LOG.info(
+ "Not re-enabling node: {}, since it is not present in the RegistryActiveNodeList",
+ nodeInfo.toShortString());
}
}
} finally {
@@ -855,13 +845,32 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
- private void disableInstance(ServiceInstance instance, boolean isCommFailure) {
+ /**
+ * Updates relevant structures on the node, and fixes the position in the disabledNodeQueue
+ * to facilitate the actual re-enablement of the node.
+ * @param nodeInfo the node to be re-enabled
+ */
+ private void queueNodeForReEnablement(final NodeInfo nodeInfo) {
+ nodeInfo.enableNode();
+ if ( disabledNodesQueue.remove(nodeInfo)) {
+ disabledNodesQueue.add(nodeInfo);
+ }
+ if (metrics != null) {
+ metrics.setDisabledNodeCount(disabledNodesQueue.size());
+ }
+ }
+
+ private void disableNode(NodeInfo nodeInfo, boolean isCommFailure) {
writeLock.lock();
try {
- NodeInfo nodeInfo = instanceToNodeMap.get(instance.getWorkerIdentity());
if (nodeInfo == null || nodeInfo.isDisabled()) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Node: " + instance + " already disabled, or invalid. Not doing anything.");
+ if (nodeInfo != null) {
+ LOG.debug("Node: " + nodeInfo.toShortString() +
+ " already disabled, or invalid. Not doing anything.");
+ } else {
+ LOG.debug("Ignoring disableNode invocation for null NodeInfo");
+ }
}
} else {
nodeInfo.disableNode(isCommFailure);
@@ -879,6 +888,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
+ private static NodeReport constructNodeReport(ServiceInstance serviceInstance,
+ boolean healthy) {
+ NodeReport nodeReport = NodeReport.newInstance(NodeId
+ .newInstance(serviceInstance.getHost(), serviceInstance.getRpcPort()),
+ healthy ? NodeState.RUNNING : NodeState.LOST,
+ serviceInstance.getServicesAddress(), null, null,
+ null, 0, "", 0l);
+ return nodeReport;
+ }
+
private void addPendingTask(TaskInfo taskInfo) {
writeLock.lock();
try {
@@ -1124,21 +1143,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private ScheduleResult scheduleTask(TaskInfo taskInfo) {
SelectHostResult selectHostResult = selectHost(taskInfo);
if (selectHostResult.scheduleResult == ScheduleResult.SCHEDULED) {
- NodeServiceInstancePair nsPair = selectHostResult.nodeServiceInstancePair;
+ NodeInfo nodeInfo = selectHostResult.nodeInfo;
Container container =
containerFactory.createContainer(resourcePerExecutor, taskInfo.priority,
- nsPair.getServiceInstance().getHost(),
- nsPair.getServiceInstance().getRpcPort(),
- nsPair.getServiceInstance().getServicesAddress());
+ nodeInfo.getHost(),
+ nodeInfo.getRpcPort(),
+ nodeInfo.getServiceAddress());
writeLock.lock(); // While updating local structures
try {
- LOG.info("Assigned task {} to container {} on node={}", taskInfo, container.getId(),
- nsPair.getServiceInstance());
+ LOG.info("Assigned task={} on node={}, to container={} on node={}",
+ taskInfo, nodeInfo.toShortString(), container.getId());
dagStats.registerTaskAllocated(taskInfo.requestedHosts, taskInfo.requestedRacks,
- nsPair.getServiceInstance().getHost());
- taskInfo.setAssignmentInfo(nsPair.getServiceInstance(), container.getId(), clock.getTime());
+ nodeInfo.getHost());
+ taskInfo.setAssignmentInfo(nodeInfo, container.getId(), clock.getTime());
registerRunningTask(taskInfo);
- nsPair.getNodeInfo().registerTaskScheduled();
+ nodeInfo.registerTaskScheduled();
} finally {
writeLock.unlock();
}
@@ -1169,7 +1188,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
Iterator<TaskInfo> taskInfoIterator = entryAtPriority.getValue().iterator();
while (taskInfoIterator.hasNext() && preemptedCount < numTasksToPreempt) {
TaskInfo taskInfo = taskInfoIterator.next();
- if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedInstance.getHost())) {
+ if (preemptHosts == null || preemptHosts.contains(taskInfo.assignedNode.getHost())) {
// Candidate for preemption.
preemptedCount++;
LOG.info("preempting {} for task at priority {} with potentialHosts={}", taskInfo,
@@ -1178,9 +1197,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
if (preemptedTaskList == null) {
preemptedTaskList = new LinkedList<>();
}
- dagStats.registerTaskPreempted(taskInfo.assignedInstance.getHost());
+ dagStats.registerTaskPreempted(taskInfo.assignedNode.getHost());
preemptedTaskList.add(taskInfo);
- registerPendingPreemption(taskInfo.assignedInstance.getHost());
+ registerPendingPreemption(taskInfo.assignedNode.getHost());
// Remove from the runningTaskList
taskInfoIterator.remove();
}
@@ -1256,14 +1275,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
}
- private String nodeToString(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
- return serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", workerIdentity=" +
- serviceInstance.getWorkerIdentity() + ", webAddress=" +
- serviceInstance.getServicesAddress() + ", currentlyScheduledTasksOnNode=" + nodeInfo.numScheduledTasks;
- }
-
-
-
// ------ Inner classes defined after this point ------
@VisibleForTesting
@@ -1322,37 +1333,19 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private class NodeEnablerCallable implements Callable<Void> {
private final AtomicBoolean isShutdown = new AtomicBoolean(false);
- private static final long REFRESH_INTERVAL = 10000l;
- long nextPollInterval = REFRESH_INTERVAL;
- long lastRefreshTime;
+ private static final long POLL_TIMEOUT = 10000L;
@Override
public Void call() {
- lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
try {
- while (true) {
- NodeInfo nodeInfo = disabledNodesQueue.poll(nextPollInterval, TimeUnit.MILLISECONDS);
- if (nodeInfo != null) {
- long currentTime = LlapTaskSchedulerService.this.clock.getTime();
- // A node became available. Enable the node and try scheduling.
- reenableDisabledNode(nodeInfo);
- trySchedulingPendingTasks();
-
- nextPollInterval -= (currentTime - lastRefreshTime);
- }
-
- if (nextPollInterval < 0 || nodeInfo == null) {
- // timeout expired. Reset the poll interval and refresh nodes.
- nextPollInterval = REFRESH_INTERVAL;
- lastRefreshTime = LlapTaskSchedulerService.this.clock.getTime();
- // TODO Get rid of this polling once we have notificaitons from the registry sub-system
- if (LOG.isDebugEnabled()) {
- LOG.debug("Refreshing instances based on poll interval");
- }
- scanForNodeChanges();
- }
+ NodeInfo nodeInfo =
+ disabledNodesQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (nodeInfo != null) {
+ // A node became available. Enable the node and try scheduling.
+ reenableDisabledNode(nodeInfo);
+ trySchedulingPendingTasks();
}
} catch (InterruptedException e) {
if (isShutdown.get()) {
@@ -1393,8 +1386,9 @@ public class LlapTaskSchedulerService extends TaskScheduler {
getContext().reportError(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE,
"No LLAP Daemons are running", getContext().getCurrentDagInfo());
} catch (Exception e) {
+ DagInfo currentDagInfo = getContext().getCurrentDagInfo();
LOG.error("Exception when reporting SERVICE_UNAVAILABLE error for dag: {}",
- getContext().getCurrentDagInfo().getName(), e);
+ currentDagInfo == null ? "" : currentDagInfo.getName(), e);
}
}
}
@@ -1505,8 +1499,20 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
- ServiceInstance getServiceInstance() {
- return serviceInstance;
+ String getNodeIdentity() {
+ return serviceInstance.getWorkerIdentity();
+ }
+
+ String getHost() {
+ return serviceInstance.getHost();
+ }
+
+ int getRpcPort() {
+ return serviceInstance.getRpcPort();
+ }
+
+ String getServiceAddress() {
+ return serviceInstance.getServicesAddress();
}
void enableNode() {
@@ -1535,7 +1541,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
delayTime = blacklistConf.maxDelay;
}
if (LOG.isInfoEnabled()) {
- LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}", serviceInstance,
+ LOG.info("Disabling instance {} for {} milli-seconds. commFailure={}",
+ serviceInstance,
delayTime, commFailure);
}
expireTimeMillis = currentTime + delayTime;
@@ -1577,7 +1584,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
/**
* @return the time at which this node will be re-enabled
*/
- public long getEnableTime() {
+ long getEnableTime() {
return expireTimeMillis;
}
@@ -1585,22 +1592,21 @@ public class LlapTaskSchedulerService extends TaskScheduler {
return disabled;
}
- public boolean hadCommFailure() {
+ boolean hadCommFailure() {
return hadCommFailure;
}
/* Returning true does not guarantee that the task will run, considering other queries
may be running in the system. Also depends upon the capacity usage configuration
*/
- public boolean canAcceptTask() {
- boolean result = !hadCommFailure && !disabled && serviceInstance.isAlive()
+ boolean canAcceptTask() {
+ boolean result = !hadCommFailure && !disabled
&&(numSchedulableTasks == -1 || ((numSchedulableTasks - numScheduledTasks) > 0));
if (LOG.isInfoEnabled()) {
LOG.info("Node[" + serviceInstance.getHost() + ":" + serviceInstance.getRpcPort() + ", " +
serviceInstance.getWorkerIdentity() + "]: " +
- "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}, serviceInstance.isAlive={}",
- result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled,
- serviceInstance.isAlive());
+ "canAcceptTask={}, numScheduledTasks={}, numSchedulableTasks={}, hadCommFailure={}, disabled={}",
+ result, numScheduledTasks, numSchedulableTasks, hadCommFailure, disabled);
}
return result;
}
@@ -1634,6 +1640,13 @@ public class LlapTaskSchedulerService extends TaskScheduler {
+ ", commFailures=" + hadCommFailure
+'}';
}
+
+ private String toShortString() {
+ return "{" + serviceInstance.getHost() + ":" +
+ serviceInstance.getRpcPort() + ", id=" + getNodeIdentity() +
+ ", stc=" + numSchedulableTasks + "}";
+ }
+
}
@VisibleForTesting
@@ -1699,6 +1712,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
_registerAllocationInHostMap(allocatedHost, numAllocationsPerHost);
}
+ // TODO Track stats of rejections etc per host
void registerTaskPreempted(String host) {
numPreemptedTasks++;
}
@@ -1753,7 +1767,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
long startTime;
long preemptTime;
ContainerId containerId;
- ServiceInstance assignedInstance;
+ NodeInfo assignedNode;
private State state = State.PENDING;
boolean inDelayedQueue = false;
@@ -1782,8 +1796,8 @@ public class LlapTaskSchedulerService extends TaskScheduler {
this.uniqueId = ID_GEN.getAndIncrement();
}
- synchronized void setAssignmentInfo(ServiceInstance instance, ContainerId containerId, long startTime) {
- this.assignedInstance = instance;
+ synchronized void setAssignmentInfo(NodeInfo nodeInfo, ContainerId containerId, long startTime) {
+ this.assignedNode = nodeInfo;
this.containerId = containerId;
this.startTime = startTime;
this.state = State.ASSIGNED;
@@ -1859,7 +1873,7 @@ public class LlapTaskSchedulerService extends TaskScheduler {
", priority=" + priority +
", startTime=" + startTime +
", containerId=" + containerId +
- ", assignedInstance=" + assignedInstance +
+ ", assignedNode=" + (assignedNode == null ? "" : assignedNode.toShortString()) +
", uniqueId=" + uniqueId +
", localityDelayTimeout=" + localityDelayTimeout +
'}';
@@ -1907,16 +1921,16 @@ public class LlapTaskSchedulerService extends TaskScheduler {
}
private static class SelectHostResult {
- final NodeServiceInstancePair nodeServiceInstancePair;
+ final NodeInfo nodeInfo;
final ScheduleResult scheduleResult;
- SelectHostResult(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
- this.nodeServiceInstancePair = new NodeServiceInstancePair(serviceInstance, nodeInfo);
+ SelectHostResult(NodeInfo nodeInfo) {
+ this.nodeInfo = nodeInfo;
this.scheduleResult = ScheduleResult.SCHEDULED;
}
SelectHostResult(ScheduleResult scheduleResult) {
- this.nodeServiceInstancePair = null;
+ this.nodeInfo = null;
this.scheduleResult = scheduleResult;
}
}
@@ -1928,24 +1942,6 @@ public class LlapTaskSchedulerService extends TaskScheduler {
private static final SelectHostResult SELECT_HOST_RESULT_DELAYED_RESOURCES =
new SelectHostResult(ScheduleResult.DELAYED_RESOURCES);
- private static class NodeServiceInstancePair {
- final NodeInfo nodeInfo;
- final ServiceInstance serviceInstance;
-
- private NodeServiceInstancePair(ServiceInstance serviceInstance, NodeInfo nodeInfo) {
- this.serviceInstance = serviceInstance;
- this.nodeInfo = nodeInfo;
- }
-
- public ServiceInstance getServiceInstance() {
- return serviceInstance;
- }
-
- public NodeInfo getNodeInfo() {
- return nodeInfo;
- }
- }
-
private static final class NodeBlacklistConf {
private final long minDelay;
private final long maxDelay;
@@ -1986,4 +1982,5 @@ public class LlapTaskSchedulerService extends TaskScheduler {
'}';
}
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4e075468/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 402658b..85d2bcd 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -210,7 +210,7 @@ public class TestLlapTaskSchedulerService {
assertEquals(3, tsWrapper.ts.instanceToNodeMap.size());
LlapTaskSchedulerService.NodeInfo disabledNodeInfo = tsWrapper.ts.disabledNodesQueue.peek();
assertNotNull(disabledNodeInfo);
- assertEquals(HOST1, disabledNodeInfo.serviceInstance.getHost());
+ assertEquals(HOST1, disabledNodeInfo.getHost());
assertEquals((10000l), disabledNodeInfo.getDelay(TimeUnit.MILLISECONDS));
assertEquals((10000l + 10000l), disabledNodeInfo.expireTimeMillis);