You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/02/21 16:22:57 UTC
[hive] branch master updated: HIVE-22359 : LLAP: when a node
restarts with the exact same host/port in kubernetes it is not detected as
a task failure (Prasanth J via Gopal V)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 1d8ce84 HIVE-22359 : LLAP: when a node restarts with the exact same host/port in kubernetes it is not detected as a task failure (Prasanth J via Gopal V)
1d8ce84 is described below
commit 1d8ce84f5f459f17220bb81558432d81cbe956dd
Author: Prasanth Jayachandran <pr...@apache.org>
AuthorDate: Fri Feb 21 08:22:05 2020 -0800
HIVE-22359 : LLAP: when a node restarts with the exact same host/port in kubernetes it is not detected as a task failure (Prasanth J via Gopal V)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hive/llap/tezplugins/LlapTaskCommunicator.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index dc10f22..e5dc378 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -724,13 +724,13 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
}
}
- public void registerPingingNode(LlapNodeId nodeId) {
+ public void registerPingingNode(LlapNodeId nodeId, String uniqueId) {
long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
PingingNodeInfo ni = new PingingNodeInfo(currentTs);
PingingNodeInfo old = pingedNodeMap.put(nodeId, ni);
if (old == null) {
if (LOG.isInfoEnabled()) {
- LOG.info("Added new pinging node: [{}]", nodeId);
+ LOG.info("Added new pinging node: [{}] with uniqueId: {}", nodeId, uniqueId);
}
} else {
old.pingCount.incrementAndGet();
@@ -758,7 +758,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
TezAttemptArray tasks, BooleanArray guaranteed) {
// TODO: do we ever need the port? we could just do away with nodeId altogether.
LlapNodeId nodeId = LlapNodeId.getInstance(hostname, port);
- registerPingingNode(nodeId);
+ registerPingingNode(nodeId, uniqueId);
BiMap<ContainerId, TezTaskAttemptID> biMap =
entityTracker.getContainerAttemptMapForNode(nodeId);
if (biMap != null) {
@@ -793,9 +793,16 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
getContext().containerAlive(entry.getKey());
}
}
- }
- if (!error.isEmpty()) {
- LOG.info("The tasks we expected to be on the node are not there: " + error);
+
+ if (!error.isEmpty()) {
+ LOG.info("The tasks we expected to be on the node are not there: " + error);
+ for (Map.Entry<ContainerId, TezTaskAttemptID> entry : biMap.entrySet()) {
+ LOG.info("Sending a kill for attempt {}, due to a ping from node with same host and same port but " +
+ "registered with different unique ID", entry.getValue());
+ getContext().taskKilled(entry.getValue(), TaskAttemptEndReason.NODE_FAILED,
+ "Node with same host and port but with new unique ID pinged");
+ }
+ }
}
} else {
long currentTs = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);