You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2016/03/18 00:13:12 UTC
hive git commit: HIVE-13305: LlapInputFormat should get
LlapOutputFormatService port from the LLAP service registry
Repository: hive
Updated Branches:
refs/heads/llap 2945c3b2d -> 25140659c
HIVE-13305: LlapInputFormat should get LlapOutputFormatService port from the LLAP service registry
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/25140659
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/25140659
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/25140659
Branch: refs/heads/llap
Commit: 25140659c578e3e01a3ce36ba5108a38303dc843
Parents: 2945c3b
Author: Jason Dere <jd...@hortonworks.com>
Authored: Thu Mar 17 16:12:34 2016 -0700
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Thu Mar 17 16:12:34 2016 -0700
----------------------------------------------------------------------
.../hive/llap/registry/ServiceInstance.java | 7 ++
.../registry/impl/LlapFixedRegistryImpl.java | 7 ++
.../impl/LlapZookeeperRegistryImpl.java | 16 ++++
.../hadoop/hive/llap/LlapInputFormat.java | 78 ++++++++++++++++++--
4 files changed, 102 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
index 2bd860a..a504146 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstance.java
@@ -54,6 +54,13 @@ public interface ServiceInstance {
public int getShufflePort();
/**
+ * OutputFormat endpoint for service instance
+ *
+ * @return
+ */
+ public int getOutputFormatPort();
+
+ /**
* Return the last known state (without refreshing)
*
* @return
http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index 8cace8f..33ab591 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -52,6 +52,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
private final int port;
private final int shuffle;
private final int mngPort;
+ private final int outputFormatPort;
private final String[] hosts;
private final int memory;
private final int vcores;
@@ -65,6 +66,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
this.shuffle = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_SHUFFLE_PORT);
this.resolveHosts = conf.getBoolean(FIXED_REGISTRY_RESOLVE_HOST_NAMES, true);
this.mngPort = HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT);
+ this.outputFormatPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
for (Map.Entry<String, String> kv : conf) {
if (kv.getKey().startsWith(HiveConf.PREFIX_LLAP)
@@ -151,6 +153,11 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
+ public int getOutputFormatPort() {
+ return LlapFixedRegistryImpl.this.outputFormatPort;
+ }
+
+ @Override
public boolean isAlive() {
return true;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index ab9fa39..c440e1e 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -90,6 +90,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
private static final String IPC_MNG = "llapmng";
private static final String IPC_SHUFFLE = "shuffle";
private static final String IPC_LLAP = "llap";
+ private static final String IPC_OUTPUTFORMAT = "llapoutputformat";
private final static String ROOT_NAMESPACE = "llap";
private final Configuration conf;
@@ -241,6 +242,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
HiveConf.getIntVar(conf, ConfVars.LLAP_MANAGEMENT_RPC_PORT)));
}
+ public Endpoint getOutputFormatEndpoint() {
+ return RegistryTypeUtils.ipcEndpoint(IPC_OUTPUTFORMAT, new InetSocketAddress(hostname,
+ HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT)));
+ }
+
@Override
public void register() throws IOException {
ServiceRecord srv = new ServiceRecord();
@@ -310,6 +316,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
private final int rpcPort;
private final int mngPort;
private final int shufflePort;
+ private final int outputFormatPort;
public DynamicServiceInstance(ServiceRecord srv) throws IOException {
this.srv = srv;
@@ -317,6 +324,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
final Endpoint shuffle = srv.getInternalEndpoint(IPC_SHUFFLE);
final Endpoint rpc = srv.getInternalEndpoint(IPC_LLAP);
final Endpoint mng = srv.getInternalEndpoint(IPC_MNG);
+ final Endpoint outputFormat = srv.getInternalEndpoint(IPC_OUTPUTFORMAT);
this.host =
RegistryTypeUtils.getAddressField(rpc.addresses.get(0),
@@ -330,6 +338,9 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
this.shufflePort =
Integer.valueOf(RegistryTypeUtils.getAddressField(shuffle.addresses.get(0),
AddressTypes.ADDRESS_PORT_FIELD));
+ this.outputFormatPort =
+ Integer.valueOf(RegistryTypeUtils.getAddressField(outputFormat.addresses.get(0),
+ AddressTypes.ADDRESS_PORT_FIELD));
}
@Override
@@ -386,6 +397,11 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
return mngPort;
}
+ @Override
+ public int getOutputFormatPort() {
+ return outputFormatPort;
+ }
+
// Relying on the identity hashCode and equality, since refreshing instances retains the old copy
// of an already known instance.
}
http://git-wip-us.apache.org/repos/asf/hive/blob/25140659/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
index b32d662..847c74f 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/LlapInputFormat.java
@@ -17,10 +17,12 @@
package org.apache.hadoop.hive.llap;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Set;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
@@ -28,6 +30,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto;
import org.apache.hadoop.hive.llap.ext.LlapTaskUmbilicalExternalClient;
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.hive.llap.registry.ServiceInstanceSet;
+import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -72,8 +77,12 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
LlapInputSplit llapSplit = (LlapInputSplit) split;
SubmitWorkInfo submitWorkInfo = SubmitWorkInfo.fromBytes(llapSplit.getPlanBytes());
- // TODO HACK: Spark is built with Hive-1.2.1, does not have access to HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT
- int llapSubmitPort = job.getInt("hive.llap.daemon.rpc.port", 15001);
+ ServiceInstance serviceInstance = getServiceInstance(job, llapSplit);
+ String host = serviceInstance.getHost();
+ int llapSubmitPort = serviceInstance.getRpcPort();
+
+ LOG.info("Found service instance for host " + host + " with rpc port " + llapSubmitPort
+ + " and outputformat port " + serviceInstance.getOutputFormatPort());
LlapTaskUmbilicalExternalClient llapClient =
new LlapTaskUmbilicalExternalClient(job, submitWorkInfo.getTokenIdentifier(),
@@ -92,16 +101,13 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
List<TezEvent> tezEventList = Lists.newArrayList();
tezEventList.add(tezEvent);
- // this is just the portion that sets up the io to receive data
- String host = split.getLocations()[0];
-
llapClient.submitWork(submitWorkRequestProto, host, llapSubmitPort, tezEventList);
String id = HiveConf.getVar(job, HiveConf.ConfVars.HIVEQUERYID) + "_" + llapSplit.getSplitNum();
HiveConf conf = new HiveConf();
Socket socket = new Socket(host,
- conf.getIntVar(HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT));
+ serviceInstance.getOutputFormatPort());
LOG.debug("Socket connected");
@@ -119,6 +125,66 @@ public class LlapInputFormat<V extends WritableComparable> implements InputForma
throw new IOException("These are not the splits you are looking for.");
}
+ private ServiceInstance getServiceInstance(JobConf job, LlapInputSplit llapSplit) throws IOException {
+ LlapRegistryService registryService = LlapRegistryService.getClient(job);
+ String host = llapSplit.getLocations()[0];
+
+ ServiceInstance serviceInstance = getServiceInstanceForHost(registryService, host);
+ if (serviceInstance == null) {
+ throw new IOException("No service instances found for " + host + " in registry");
+ }
+
+ return serviceInstance;
+ }
+
+ private ServiceInstance getServiceInstanceForHost(LlapRegistryService registryService, String host) throws IOException {
+ InetAddress address = InetAddress.getByName(host);
+ ServiceInstanceSet instanceSet = registryService.getInstances();
+ ServiceInstance serviceInstance = null;
+
+ // The name used in the service registry may not match the host name we're using.
+ // Try hostname/canonical hostname/host address
+
+ String name = address.getHostName();
+ LOG.info("Searching service instance by hostname " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ name = address.getCanonicalHostName();
+ LOG.info("Searching service instance by canonical hostname " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ name = address.getHostAddress();
+ LOG.info("Searching service instance by address " + name);
+ serviceInstance = selectServiceInstance(instanceSet.getByHost(name));
+ if (serviceInstance != null) {
+ return serviceInstance;
+ }
+
+ return serviceInstance;
+ }
+
+ private ServiceInstance selectServiceInstance(Set<ServiceInstance> serviceInstances) {
+ if (serviceInstances == null || serviceInstances.isEmpty()) {
+ return null;
+ }
+
+ // Get the first live service instance
+ for (ServiceInstance serviceInstance : serviceInstances) {
+ if (serviceInstance.isAlive()) {
+ return serviceInstance;
+ }
+ }
+
+ LOG.info("No live service instances were found");
+ return null;
+ }
+
private SubmitWorkRequestProto constructSubmitWorkRequestProto(SubmitWorkInfo submitWorkInfo,
int taskNum,
InetSocketAddress address,