You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/01 01:08:47 UTC
[05/44] hive git commit: HIVE-14624 : LLAP: Use FQDN when submitting
work to LLAP (Sergey Shelukhin, reviewed by Siddharth Seth)
HIVE-14624 : LLAP: Use FQDN when submitting work to LLAP (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/19774029
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/19774029
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/19774029
Branch: refs/heads/hive-14535
Commit: 19774029c4c1d90982354c36840bb485d74faaf1
Parents: e297a15
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Sep 20 11:30:49 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Sep 20 11:30:59 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +++
.../java/org/apache/hadoop/hive/llap/LlapUtil.java | 12 ++++++++++++
.../apache/hadoop/hive/llap/LlapBaseInputFormat.java | 6 +++---
.../hive/llap/tezplugins/LlapTaskCommunicator.java | 14 ++++++++++----
.../llap/tezplugins/TestLlapTaskCommunicator.java | 5 +++++
5 files changed, 33 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 301159e..ccdfca6 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -393,6 +393,7 @@ public class HiveConf extends Configuration {
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_CONTAINER_ID.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_VALIDATE_ACLS.varname);
llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_LOGGER.varname);
+ llapDaemonVarsSetLocal.add(ConfVars.LLAP_DAEMON_AM_USE_FQDN.varname);
}
/**
@@ -2909,6 +2910,8 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.MILLISECONDS),
"Amount of time to wait on connection failures to the AM from an LLAP daemon before\n" +
"considering the AM to be dead.", "llap.am.liveness.connection.timeout-millis"),
+ LLAP_DAEMON_AM_USE_FQDN("hive.llap.am.use.fqdn", false,
+ "Whether to use FQDN of the AM machine when submitting work to LLAP."),
// Not used yet - since the Writable RPC engine does not support this policy.
LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS(
"hive.llap.am.liveness.connection.sleep.between.retries.ms", "2000ms",
http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
----------------------------------------------------------------------
diff --git a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
index 0c04d9d..8352943 100644
--- a/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
+++ b/llap-common/src/java/org/apache/hadoop/hive/llap/LlapUtil.java
@@ -14,6 +14,8 @@
package org.apache.hadoop.hive.llap;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -25,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto.Builder;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
@@ -180,4 +183,13 @@ public class LlapUtil {
return sb.toString();
}
}
+
+ public static String getAmHostNameFromAddress(InetSocketAddress address, Configuration conf) {
+ if (!HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_USE_FQDN)) {
+ return address.getHostName();
+ }
+ InetAddress ia = address.getAddress();
+ // getCanonicalHostName would either return FQDN, or an IP.
+ return (ia == null) ? address.getHostName() : ia.getCanonicalHostName();
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
index 7dae4fc..288a8eb 100644
--- a/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
+++ b/llap-ext-client/src/java/org/apache/hadoop/hive/llap/LlapBaseInputFormat.java
@@ -160,7 +160,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
SubmitWorkRequestProto request = constructSubmitWorkRequestProto(
submitWorkInfo, llapSplit.getSplitNum(), attemptNum, llapClient.getAddress(),
submitWorkInfo.getToken(), llapSplit.getFragmentBytes(),
- llapSplit.getFragmentBytesSignature());
+ llapSplit.getFragmentBytesSignature(), job);
llapClient.submitWork(request, host, llapSubmitPort);
Socket socket = new Socket(host, serviceInstance.getOutputFormatPort());
@@ -290,7 +290,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
int taskNum, int attemptNum, InetSocketAddress address, Token<JobTokenIdentifier> token,
- byte[] fragmentBytes, byte[] fragmentBytesSignature) throws IOException {
+ byte[] fragmentBytes, byte[] fragmentBytesSignature, JobConf job) throws IOException {
ApplicationId appId = submitWorkInfo.getFakeAppId();
// This works, assuming the executor is running within YARN.
@@ -325,7 +325,7 @@ public class LlapBaseInputFormat<V extends WritableComparable<?>>
builder.setFragmentNumber(taskNum);
builder.setAttemptNumber(attemptNum);
builder.setContainerIdString(containerId.toString());
- builder.setAmHost(address.getHostName());
+ builder.setAmHost(LlapUtil.getAmHostNameFromAddress(address, job));
builder.setAmPort(address.getPort());
builder.setCredentialsBinary(ByteString.copyFrom(credentialsBinary));
builder.setFragmentRuntimeInfo(runtimeInfo.build());
http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index d1d2ad4..7dd778d 100644
--- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.LlapNodeId;
+import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
@@ -108,6 +109,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
private final LlapTaskUmbilicalProtocol umbilical;
private final Token<LlapTokenIdentifier> token;
private final String user;
+ private String amHost;
// These two structures track the list of known nodes, and the list of nodes which are sending in keep-alive heartbeats.
// Primarily for debugging purposes a.t.m, since there's some unexplained TASK_TIMEOUTS which are currently being observed.
@@ -218,9 +220,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
server.start();
this.address = NetUtils.getConnectAddress(server);
- LOG.info(
- "Started LlapUmbilical: " + umbilical.getClass().getName() + " at address: " + address +
- " with numHandlers=" + numHandlers);
+ this.amHost = LlapUtil.getAmHostNameFromAddress(address, conf);
+ LOG.info("Started LlapUmbilical: " + umbilical.getClass().getName() + " at address: "
+ + address + " with numHandlers=" + numHandlers + " using the host name " + amHost);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
@@ -610,7 +612,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
builder.setFragmentNumber(taskSpec.getTaskAttemptID().getTaskID().getId());
builder.setAttemptNumber(taskSpec.getTaskAttemptID().getId());
builder.setContainerIdString(containerId.toString());
- builder.setAmHost(getAddress().getHostName());
+ builder.setAmHost(getAmHostString());
builder.setAmPort(getAddress().getPort());
Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
@@ -842,4 +844,8 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl {
.setAppAttemptNumber(getContext().getApplicationAttemptId().getAttemptId())
.build();
}
+
+ public String getAmHostString() {
+ return amHost;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/19774029/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
index 0f28f70..5efe7c6 100644
--- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
@@ -417,6 +417,11 @@ public class TestLlapTaskCommunicator {
public InetSocketAddress getAddress() {
return InetSocketAddress.createUnresolved("localhost", 15001);
}
+
+ @Override
+ public String getAmHostString() {
+ return "localhost";
+ }
}
}