You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2020/02/26 23:12:47 UTC
[hive] branch master updated: HIVE-22922: LLAP: ShuffleHandler may
not find shuffle data if pod restarts in k8s (Prasanth Jayachandran
reviewed by Gopal V)
This is an automated email from the ASF dual-hosted git repository.
prasanthj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f087eac HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V)
f087eac is described below
commit f087eac18a64f97e890427c5bdeff72269a86b55
Author: Prasanth Jayachandran <pr...@apache.org>
AuthorDate: Wed Feb 26 15:12:33 2020 -0800
HIVE-22922: LLAP: ShuffleHandler may not find shuffle data if pod restarts in k8s (Prasanth Jayachandran reviewed by Gopal V)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +++
.../hive/llap/daemon/impl/ContainerRunnerImpl.java | 19 ++++++++++++++++++-
2 files changed, 21 insertions(+), 1 deletion(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e419dc5..bfc2695 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4380,6 +4380,9 @@ public class HiveConf extends Configuration {
"considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"),
LLAP_DAEMON_AM_USE_FQDN("hive.llap.am.use.fqdn", true,
"Whether to use FQDN of the AM machine when submitting work to LLAP."),
+ LLAP_DAEMON_EXEC_USE_FQDN("hive.llap.exec.use.fqdn", true,
+ "On non-kerberized clusters, where the hostnames are stable but ip address changes, setting this config\n" +
+ " to false will use ip address of llap daemon in execution context instead of FQDN"),
// Not used yet - since the Writable RPC engine does not support this policy.
LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
"hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms",
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 2ae7871..6a13b55 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -15,6 +15,7 @@
package org.apache.hadoop.hive.llap.daemon.impl;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -27,6 +28,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.UgiFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.DaemonId;
import org.apache.hadoop.hive.llap.LlapNodeId;
@@ -117,6 +119,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
private final DaemonId daemonId;
private final UgiFactory fsUgiFactory;
private final SocketFactory socketFactory;
+ private final boolean execUseFQDN;
public ContainerRunnerImpl(Configuration conf, int numExecutors, AtomicReference<Integer> localShufflePort,
AtomicReference<InetSocketAddress> localAddress,
@@ -140,6 +143,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
this.queryTracker = queryTracker;
this.executorService = executorService;
completionListener = (SchedulerFragmentCompletingListener) executorService;
+ this.execUseFQDN = conf.getBoolean(HiveConf.ConfVars.LLAP_DAEMON_EXEC_USE_FQDN.varname, true);
// Distribute the available memory between the tasks.
this.memoryPerExecutor = (long)(totalMemoryAvailableBytes / (float) numExecutors);
@@ -285,10 +289,23 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu
boolean addTaskTimes = callableConf.getBoolean(tezSummary.varname, tezSummary.defaultBoolVal)
&& callableConf.getBoolean(llapTasks.varname, llapTasks.defaultBoolVal);
+ final String llapHost;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // when kerberos is enabled always use FQDN
+ llapHost = localAddress.get().getHostName();
+ } else if (execUseFQDN) {
+ // when FQDN is explicitly requested (default)
+ llapHost = localAddress.get().getHostName();
+ } else {
+ // when FQDN is not requested, use ip address
+ llapHost = localAddress.get().getAddress().getHostAddress();
+ }
+ LOG.info("Using llap host: {} for execution context. hostName: {} hostAddress: {}", llapHost,
+ localAddress.get().getHostName(), localAddress.get().getAddress().getHostAddress());
// TODO: ideally we'd register TezCounters here, but it seems impossible before registerTask.
WmFragmentCounters wmCounters = new WmFragmentCounters(addTaskTimes);
TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf,
- new ExecutionContextImpl(localAddress.get().getHostName()), env,
+ new ExecutionContextImpl(llapHost), env,
credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler,
this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi,
completionListener, socketFactory, isGuaranteed, wmCounters);